Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/WatchersForTemplateClass.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/WatchersForTemplateClass.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/WatchersForTemplateClass.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/WatchersForTemplateClass.java Sun Jul 5 11:41:39 2020 @@ -1,133 +1,135 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * Holds a collection of <code>TemplateHandle</code>s who's templates - * are all of exactly the same class. Unless otherwise noted all - * methods are thread safe. This method provides the linkage between - * <code>TemplateHandle</code>s and <code>TransitionWatchers</code> - * and for the most part is not visible to the clients of either. - */ -class WatchersForTemplateClass { - /** All the templates we know about */ - private final Queue<TemplateHandle> content = new ConcurrentLinkedQueue<TemplateHandle>(); - - /** The OutriggerServerImpl we belong to */ - private final OutriggerServerImpl owner; - - /** - * Create a new <code>WatchersForTemplateClass</code> object - * associated with the specified <code>TransitionWatchers</code> object. - * @param owner The <code>OutriggerServerImpl</code> that - * this object will be a part of. - * @throws NullPointerException if <code>owner</code> is - * <code>null</code>. - */ - WatchersForTemplateClass(OutriggerServerImpl owner) { - if (owner == null) - throw new NullPointerException("owner must be non-null"); - this.owner = owner; - } - - /** - * Add a <code>TransitionWatcher</code> to the list - * of watchers looking for visibility transitions in - * entries that match the specified template. Associates - * a <code>TemplateHandle</code> using - * <code>TransitionWatcher.setTemplateHandle</code> method. - * - * @param watcher The <code>TransitionWatcher</code> being added. - * @param template The <code>EntryRep</code> that represents - * the template of interest. - * @throws NullPointerException if either argument is - * <code>null</code>. - */ - void add(TransitionWatcher watcher, EntryRep template) { - /* We try to find an existing handle, but it is ok - * if we have more than one with the same template. - * It isn't possible to add a watcher to a removed - * handle even if present during iteration. - */ - for(TemplateHandle handle : content) { - if (template.equals(handle.rep()) && - handle.addTransitionWatcher(watcher)) return; - } - - TemplateHandle handle = new TemplateHandle(template, owner, content); - if (handle.addTransitionWatcher(watcher)) content.add(handle); - // else the new handle is discarded. - } - - /** - * Iterate over the watchers associated with - * this object calling <code>isInterested</code> on each - * and if it returns <code>true</code> adding the watcher to the - * passed set. - * - * @param set The set to accumulate interested watchers - * into. - * @param transition The transition being processed. - * @param ordinal The ordinal associated with <code>transition</code>. - * @throws NullPointerException if either argument is <code>null</code>. - */ - void collectInterested(Set<TransitionWatcher> set, EntryTransition transition, - long ordinal) - { - final EntryHandle entryHandle = transition.getHandle(); - final EntryRep rep = entryHandle.rep(); - final long entryHash = entryHandle.hash(); - final int repNumFields = rep.numFields(); - - /* Look at each of handles, check to see if they match - * the changed entry and if they do ask them to - * put the appropriate watchers in the set. - */ - for (TemplateHandle handle : content) - { - // See the if handle mask is incompatible - EntryHandleTmplDesc desc = handle.descFor(repNumFields); // final - - if ((entryHash & desc.mask) != desc.hash) continue; - - if (handle.matches(rep)) handle.collectInterested(set, transition, ordinal); - } - } - - /** - * Visit each <code>TransitionWatcher</code> and check to see if - * it has expired, removing it if it has. Also reaps the - * <code>FastList</code> associated with this object. - * @param now an estimate of the current time expressed as - * milliseconds since the beginning of the epoch. - */ - void reap(long now) { - // First remove empty handles - for (TemplateHandle handle : content) - { - // Dump any expired watchers. - handle.reap(now); - handle.removeIfEmpty(); - } - } -} - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.river.outrigger.proxy.EntryRep; + + +/** + * Holds a collection of <code>TemplateHandle</code>s who's templates + * are all of exactly the same class. Unless otherwise noted all + * methods are thread safe. This method provides the linkage between + * <code>TemplateHandle</code>s and <code>TransitionWatchers</code> + * and for the most part is not visible to the clients of either. + */ +class WatchersForTemplateClass { + /** All the templates we know about */ + private final Queue<TemplateHandle> content = new ConcurrentLinkedQueue<TemplateHandle>(); + + /** The OutriggerServerImpl we belong to */ + private final OutriggerServerImpl owner; + + /** + * Create a new <code>WatchersForTemplateClass</code> object + * associated with the specified <code>TransitionWatchers</code> object. + * @param owner The <code>OutriggerServerImpl</code> that + * this object will be a part of. + * @throws NullPointerException if <code>owner</code> is + * <code>null</code>. + */ + WatchersForTemplateClass(OutriggerServerImpl owner) { + if (owner == null) + throw new NullPointerException("owner must be non-null"); + this.owner = owner; + } + + /** + * Add a <code>TransitionWatcher</code> to the list + * of watchers looking for visibility transitions in + * entries that match the specified template. Associates + * a <code>TemplateHandle</code> using + * <code>TransitionWatcher.setTemplateHandle</code> method. + * + * @param watcher The <code>TransitionWatcher</code> being added. + * @param template The <code>EntryRep</code> that represents + * the template of interest. + * @throws NullPointerException if either argument is + * <code>null</code>. + */ + void add(TransitionWatcher watcher, EntryRep template) { + /* We try to find an existing handle, but it is ok + * if we have more than one with the same template. + * It isn't possible to add a watcher to a removed + * handle even if present during iteration. + */ + for(TemplateHandle handle : content) { + if (template.equals(handle.rep()) && + handle.addTransitionWatcher(watcher)) return; + } + + TemplateHandle handle = new TemplateHandle(template, owner, content); + if (handle.addTransitionWatcher(watcher)) content.add(handle); + // else the new handle is discarded. + } + + /** + * Iterate over the watchers associated with + * this object calling <code>isInterested</code> on each + * and if it returns <code>true</code> adding the watcher to the + * passed set. + * + * @param set The set to accumulate interested watchers + * into. + * @param transition The transition being processed. + * @param ordinal The ordinal associated with <code>transition</code>. + * @throws NullPointerException if either argument is <code>null</code>. + */ + void collectInterested(Set<TransitionWatcher> set, EntryTransition transition, + long ordinal) + { + final EntryHandle entryHandle = transition.getHandle(); + final EntryRep rep = entryHandle.rep(); + final long entryHash = entryHandle.hash(); + final int repNumFields = rep.numFields(); + + /* Look at each of handles, check to see if they match + * the changed entry and if they do ask them to + * put the appropriate watchers in the set. + */ + for (TemplateHandle handle : content) + { + // See the if handle mask is incompatible + EntryHandleTmplDesc desc = handle.descFor(repNumFields); // final + + if ((entryHash & desc.mask) != desc.hash) continue; + + if (handle.matches(rep)) handle.collectInterested(set, transition, ordinal); + } + } + + /** + * Visit each <code>TransitionWatcher</code> and check to see if + * it has expired, removing it if it has. Also reaps the + * <code>FastList</code> associated with this object. + * @param now an estimate of the current time expressed as + * milliseconds since the beginning of the epoch. + */ + void reap(long now) { + // First remove empty handles + for (TemplateHandle handle : content) + { + // Dump any expired watchers. + handle.reap(now); + handle.removeIfEmpty(); + } + } +} +
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/pom.xml URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/pom.xml?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/pom.xml (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/pom.xml Sun Jul 5 11:41:39 2020 @@ -1,39 +1,47 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Copyright (C) 2014 the original author or authors. -~ -~ Licensed under the Apache License, Version 2.0 (the "License"); -~ you may not use this file except in compliance with the License. -~ You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.river</groupId> - <artifactId>outrigger</artifactId> - <version>3.0-SNAPSHOT</version> - </parent> - - <groupId>org.apache.river.outrigger</groupId> - <artifactId>outrigger-snaplogstore</artifactId> - <url>http://river.apache.org</url> - <name>Module :: Outrigger Snaplogstore</name> - - <dependencies> - <dependency> - <groupId>org.apache.river.outrigger</groupId> - <artifactId>outrigger-service</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- ~ Copyright (C) 2014 the original author or authors. ~ ~ Licensed under + the Apache License, Version 2.0 (the "License"); ~ you may not use this file + except in compliance with the License. ~ You may obtain a copy of the License + at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by + applicable law or agreed to in writing, software ~ distributed under the + License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. ~ See the License for the specific + language governing permissions and ~ limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.river</groupId> + <artifactId>outrigger</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.river.outrigger</groupId> + <artifactId>outrigger-snaplogstore</artifactId> + <url>http://river.apache.org</url> + <name>Module :: Outrigger Snaplogstore</name> + + <dependencies> + + <dependency> + <groupId>org.apache.river.outrigger</groupId> + <artifactId>outrigger-service</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.river.outrigger</groupId> + <artifactId>outrigger-dl</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.river</groupId> + <artifactId>river-logging</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> + +</project> Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/BaseObject.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/BaseObject.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/BaseObject.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/BaseObject.java Sun Jul 5 11:41:39 2020 @@ -1,81 +1,81 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger.snaplogstore; - -import org.apache.river.outrigger.StorableObject; -import org.apache.river.outrigger.StoredObject; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import net.jini.space.InternalSpaceException; - -/** - * Top level wrapper class for persisting outrigger objects. - * The target object is serialized and stored here as a byte - * array. - */ -class BaseObject<T extends StorableObject<T>> implements StoredObject<T>, Serializable { - static final long serialVersionUID = -400804064969360164L; - - /** - * @serialField containing a binary blob. - */ - private final byte[] blob; - - BaseObject(T object) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - object.store(oos); - oos.flush(); - blob = baos.toByteArray(); - oos.close(); - } catch (IOException e) { - throw new InternalSpaceException("Exception serializing resource", e); - } - } - - public T restore(T object) - throws IOException, ClassNotFoundException { - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(blob)); - T result = object.restore(ois); - ois.close(); - return result; - } - - /** - * Added to enable the serial form to be modified - * in a backward compatible manner (if necessary) with 3.0.0 and later. - * Modified serial form would be a breaking change for versions - * prior to 3.0.0 - * - * @serialData - * @param ois - * @throws IOException - * @throws ClassNotFoundException - * @since 3.0.0 - */ - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ois.defaultReadObject(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger.snaplogstore; + +import org.apache.river.outrigger.proxy.StorableObject; +import org.apache.river.outrigger.StoredObject; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import net.jini.space.InternalSpaceException; + +/** + * Top level wrapper class for persisting outrigger objects. + * The target object is serialized and stored here as a byte + * array. + */ +class BaseObject<T extends StorableObject<T>> implements StoredObject<T>, Serializable { + static final long serialVersionUID = -400804064969360164L; + + /** + * @serialField containing a binary blob. + */ + private final byte[] blob; + + BaseObject(T object) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + object.store(oos); + oos.flush(); + blob = baos.toByteArray(); + oos.close(); + } catch (IOException e) { + throw new InternalSpaceException("Exception serializing resource", e); + } + } + + public T restore(T object) + throws IOException, ClassNotFoundException { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(blob)); + T result = object.restore(ois); + ois.close(); + return result; + } + + /** + * Added to enable the serial form to be modified + * in a backward compatible manner (if necessary) with 3.0.0 and later. + * Modified serial form would be a breaking change for versions + * prior to 3.0.0 + * + * @serialData + * @param ois + * @throws IOException + * @throws ClassNotFoundException + * @since 3.0.0 + */ + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/LogOutputFile.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/LogOutputFile.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/LogOutputFile.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/LogOutputFile.java Sun Jul 5 11:41:39 2020 @@ -1,520 +1,520 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger.snaplogstore; - -import org.apache.river.outrigger.LogOps; -import org.apache.river.outrigger.OutriggerServerImpl; -import org.apache.river.outrigger.StorableObject; -import org.apache.river.outrigger.StorableResource; -import net.jini.id.Uuid; - -import java.io.File; -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Observable; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.space.InternalSpaceException; - -/** - * A class to write a log file, to be read later by - * <code>LogInputFile</code>. Each operation on the file is forced to - * disk, so when the operation logging function returns, the data is - * committed to the log in a recoverable way. - * <p> - * <code>LogOutputFile</code> cannot extend <code>Observable</code> - * because it must extend <code>LogFile</code> (clearly - * <code>Observable</code> should have been an interface). It acts as - * an <code>Observable</code> by having a method that returns its - * "observable part", which is an object that reports observable - * events. Right now the only observable event is the switching to a - * new physical file when the current one becomes full. - * - * @author Sun Microsystems, Inc. - * @see LogInputFile - * @see java.util.Observable - */ -class LogOutputFile extends LogFile implements LogOps { - private volatile RandomAccessFile logFile = null;// the current output log file - private volatile FileDescriptor logFD; // the current log file descriptor - private volatile ObjectOutputStream out; // objects written - private volatile int suffix; // the current suffix number - private volatile int opCnt; // number of ops on current file - private volatile int maxOps; // max ops to allow in file - private volatile Observable observable;// handle Observer/Observable - - private volatile long logBytes = 0; - private final byte[] intBuf = new byte[4]; - private final byte[] zeroBuf = new byte[4]; - - private volatile long deferedUpdateLength = 0; - private volatile long deferedPosition = 0; - - private static final long intBytes = 4; - - /** Logger for logging persistent store related information */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.storeLoggerName); - - /** - * Create a <code>LogOutputFile</code> object that will stream - * output to a series of files described by <code>basePath</code>, - * as interpreted by the relevant <code>LogFile</code> - * constructor. When the file becomes full (the maximum number of - * operations is reached), the file is closed and a new file with - * the next highest suffix is created. The - * <code>Observable</code> notification for this event passes a - * <code>File</code> argument for the filled file as the argument - * to <code>Observer</code>. - * - * @see #observable() - */ - //@see org.apache.river.mercury.LogStream#LogStream(String) - LogOutputFile(String basePath, int maxOps) throws IOException { - super(basePath); - ArrayList inDir = new ArrayList(); - suffix = existingLogs(inDir); - this.maxOps = maxOps; - nextPath(); - } - - /** - * Return an <code>Observable</code> object that represents this object - * in the Observer/Observable pattern. - * - * @see java.util.Observer - */ - Observable observable() { - if (observable == null) { // defer allocation until needed - observable = new Observable() { // we only use this if changed - public void notifyObservers() { - setChanged(); - super.notifyObservers(); - } - public void notifyObservers(Object arg) { - setChanged(); - super.notifyObservers(arg); - } - }; - } - return observable; - } - - /** - * Switch this over to the next path in the list - */ - private void nextPath() throws IOException { - boolean completed = false; - - if (logFile != null) { - - // If there was a deferred header, write it out now - // - if (deferedUpdateLength != 0) { - logFD.sync(); // force the bytes to disk - logFile.seek(deferedPosition); - writeInt((int)deferedUpdateLength); - } - try { - close(); // close the stream and the file - } catch (IOException ignore) { } // assume this is okay - completed = true; - } - - suffix++; // go to next suffix - logFile = new RandomAccessFile(baseDir.getPath() + File.separator + - baseFile + suffix, "rw"); - logFD = logFile.getFD(); - out = new ObjectOutputStream(new LogOutputStream(logFile)); - - writeInt(LOG_VERSION); - - logBytes = logFile.getFilePointer(); - logFile.setLength(logBytes); - - // always start out with zero length header for the next update - logFile.write(zeroBuf); - - // force length header to disk - logFD.sync(); - - deferedUpdateLength = 0; - opCnt = 0; - - /* - * Tell consumer about the completed log. This is done after the - * new one is created so that the old path can be known not - * to be the newest (because something newer is there). - */ - if (observable != null && completed) - observable.notifyObservers(); - } - - /** - * Close the log, but don't remove it. - */ - synchronized void close() throws IOException { - if (logFile != null) { - try { - out.close(); - logFile.close(); - } finally { - logFile = null; - } - } - } - - /** - * Override destroy so we can try to close logFile before calling - * super tries to delete all the files. - */ - void destroy() { - try { - close(); - } catch (Throwable t) { - // Don't let failure keep us from deleting the files we can - } - super.destroy(); - } - - /** - * Log a server boot. - */ - public synchronized void bootOp(long time, long sessionId) { - try { - out.writeByte(BOOT_OP); - out.writeLong(time); - out.writeLong(sessionId); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a change in join state - */ - public synchronized void joinStateOp(StorableObject state) { - try { - out.writeByte(JOINSTATE_OP); - out.writeObject(new BaseObject(state)); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a <code>write</code> operation. - */ - public synchronized void writeOp(StorableResource entry, Long txnId) { - try { - out.writeByte(WRITE_OP); - out.writeObject(new Resource(entry)); - out.writeObject(txnId); - - // A write operation under a transaction does not need to be - // flushed until it is prepared. - // - flush(txnId == null); - } catch (IOException e) { - failed(e); - } - } - - // Inherit java doc from supertype - public synchronized void writeOp(StorableResource entries[], Long txnId) { - try { - out.writeByte(BATCH_WRITE_OP); - out.writeObject(txnId); - - // In the middle of records we need to use the stream's - // writeInt, not our private one - out.writeInt(entries.length); - for (int i=0; i<entries.length; i++) { - out.writeObject(new Resource(entries[i])); - } - - // A write operation under a transaction does not need to be - // flushed until it is prepared. - // - flush(txnId == null, entries.length); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a <code>take</code> operation. - */ - public synchronized void takeOp(Uuid cookie, Long txnId) { - try { - out.writeByte(TAKE_OP); - cookie.write(out); - out.writeObject(txnId); - - // A take operation under a transaction does not need to be - // flushed until it is prepared. - // - flush(txnId == null); - } catch (IOException e) { - failed(e); - } - } - - // Inherit java doc from supertype - public synchronized void takeOp(Uuid cookies[], Long txnId) { - try { - out.writeByte(BATCH_TAKE_OP); - out.writeObject(txnId); - - // In the middle of records we need to use the stream's - // writeInt, not our private one - out.writeInt(cookies.length); - for (int i=0; i<cookies.length; i++) { - cookies[i].write(out); - } - - // A take operation under a transaction does not need to be - // flushed until it is prepared. - // - flush(txnId == null, cookies.length); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a <code>notify</code> operation. - */ - public synchronized void registerOp(StorableResource registration, - String type, StorableObject[] templates) - { - try { - out.writeByte(REGISTER_OP); - out.writeObject(new Registration(registration, type, templates)); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a <code>renew</code> operation. - */ - public synchronized void renewOp(Uuid cookie, long expiration) { - try { - out.writeByte(RENEW_OP); - cookie.write(out); - out.writeLong(expiration); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a <code>cancel</code> operation. - */ - public synchronized void cancelOp(Uuid cookie, boolean expired) { - try { - out.writeByte(CANCEL_OP); - cookie.write(out); - - // cancels due to expiration don't need to be flushed - // right away - flush(!expired); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a transaction <code>prepare</code> operation. - */ - public synchronized void prepareOp(Long txnId, - StorableObject transaction) { - try { - out.writeByte(PREPARE_OP); - out.writeObject(txnId); - out.writeObject(new BaseObject(transaction)); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a transaction <code>commit</code> operation. - */ - public synchronized void commitOp(Long txnId) { - try { - out.writeByte(COMMIT_OP); - out.writeObject(txnId); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Log a transaction <code>abort</code> operation. - */ - public synchronized void abortOp(Long txnId) { - try { - out.writeByte(ABORT_OP); - out.writeObject(txnId); - flush(); - } catch (IOException e) { - failed(e); - } - } - - public synchronized void uuidOp(Uuid uuid) { - try { - out.writeByte(UUID_OP); - uuid.write(out); - flush(); - } catch (IOException e) { - failed(e); - } - } - - /** - * Flush the current output after an operation. If the number of - * operations is exceeded, shift over to the next path. - */ - private void flush() throws IOException { - flush(true); - } - - - /** - * Conditionally flush the current output. If the number of - * operations is exceeded, shift over to the next path even if - * <code>forceToDisk</code> is <code>false</code>. - */ - private synchronized void flush(boolean forceToDisk) - throws IOException - { - flush(forceToDisk, 1); - } - - /** - * Conditionally flush the current output. If the number of - * operations is exceeded, shift over to the next path even if - * <code>forceToDisk</code> is <code>false</code>. - */ - private synchronized void flush(boolean forceToDisk, - int effectiveOpCount) - throws IOException - { - assert effectiveOpCount > 0; - - out.flush(); - - if (forceToDisk) { - - // must force contents to disk before writing real length header - logFD.sync(); - } - - long entryEnd = logFile.getFilePointer(); - long updateLen = entryEnd - logBytes - intBytes; - - // If we are not forcing to disk, we want to defer the write of the - // first header. This will leave a zero just after the last sync'ed - // record and will assure that LogInputFile will not read a partially - // written record. - // - if (!forceToDisk) { - - // If this is the first flush(false) we save the header information - // and location for later. Otherwise we write out the header - // normally. - // - if (deferedUpdateLength == 0) { - deferedUpdateLength = updateLen; // save the header length - deferedPosition = logBytes; // and position for later - } else { - // write real length header - logFile.seek(logBytes); - writeInt((int)updateLen); - } - } else { - - // If there was a deferred header, write that out now and - // then write the current header. - // - if (deferedUpdateLength != 0) { - logFile.seek(deferedPosition); - writeInt((int)deferedUpdateLength); - deferedUpdateLength = 0; - } - // write real length header - logFile.seek(logBytes); - writeInt((int)updateLen); - } - - // pad out update record so length header does not span disk blocks - entryEnd = (entryEnd + 3) & ~3L; - - // write zero length header for next update - logFile.seek(entryEnd); - logFile.write(zeroBuf); - logBytes = entryEnd; - - if (forceToDisk) - logFD.sync(); - - opCnt += effectiveOpCount; - if (opCnt >= maxOps) - nextPath(); - else - out.reset(); // not critical to flush this - } - - /** - * Write an int value in single write operation. Note we only use - * this method when writing log file and recored headers. We - * can't use it inside records because the data inside records is - * written/read using <code>ObjectIn/OutputStream</code> and this - * method writes directly to the <code>RandomAccessFile</code>. - * - * @param val int value - * @throws IOException if any other I/O error occurs - */ - private void writeInt(int val) throws IOException { - intBuf[0] = (byte) (val >> 24); - intBuf[1] = (byte) (val >> 16); - intBuf[2] = (byte) (val >> 8); - intBuf[3] = (byte) val; - logFile.write(intBuf); - } - - private void failed(Exception e) throws InternalSpaceException { - logger.log(Level.SEVERE, - "Unexpected I/O error while persisting Space data", - e); - System.exit(-5); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger.snaplogstore; + +import org.apache.river.outrigger.LogOps; +import org.apache.river.outrigger.OutriggerServerImpl; +import org.apache.river.outrigger.proxy.StorableObject; +import org.apache.river.outrigger.proxy.StorableResource; +import net.jini.id.Uuid; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Observable; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.space.InternalSpaceException; + +/** + * A class to write a log file, to be read later by + * <code>LogInputFile</code>. Each operation on the file is forced to + * disk, so when the operation logging function returns, the data is + * committed to the log in a recoverable way. + * <p> + * <code>LogOutputFile</code> cannot extend <code>Observable</code> + * because it must extend <code>LogFile</code> (clearly + * <code>Observable</code> should have been an interface). It acts as + * an <code>Observable</code> by having a method that returns its + * "observable part", which is an object that reports observable + * events. Right now the only observable event is the switching to a + * new physical file when the current one becomes full. + * + * @author Sun Microsystems, Inc. + * @see LogInputFile + * @see java.util.Observable + */ +class LogOutputFile extends LogFile implements LogOps { + private volatile RandomAccessFile logFile = null;// the current output log file + private volatile FileDescriptor logFD; // the current log file descriptor + private volatile ObjectOutputStream out; // objects written + private volatile int suffix; // the current suffix number + private volatile int opCnt; // number of ops on current file + private volatile int maxOps; // max ops to allow in file + private volatile Observable observable;// handle Observer/Observable + + private volatile long logBytes = 0; + private final byte[] intBuf = new byte[4]; + private final byte[] zeroBuf = new byte[4]; + + private volatile long deferedUpdateLength = 0; + private volatile long deferedPosition = 0; + + private static final long intBytes = 4; + + /** Logger for logging persistent store related information */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.storeLoggerName); + + /** + * Create a <code>LogOutputFile</code> object that will stream + * output to a series of files described by <code>basePath</code>, + * as interpreted by the relevant <code>LogFile</code> + * constructor. When the file becomes full (the maximum number of + * operations is reached), the file is closed and a new file with + * the next highest suffix is created. The + * <code>Observable</code> notification for this event passes a + * <code>File</code> argument for the filled file as the argument + * to <code>Observer</code>. + * + * @see #observable() + */ + //@see org.apache.river.mercury.LogStream#LogStream(String) + LogOutputFile(String basePath, int maxOps) throws IOException { + super(basePath); + ArrayList inDir = new ArrayList(); + suffix = existingLogs(inDir); + this.maxOps = maxOps; + nextPath(); + } + + /** + * Return an <code>Observable</code> object that represents this object + * in the Observer/Observable pattern. + * + * @see java.util.Observer + */ + Observable observable() { + if (observable == null) { // defer allocation until needed + observable = new Observable() { // we only use this if changed + public void notifyObservers() { + setChanged(); + super.notifyObservers(); + } + public void notifyObservers(Object arg) { + setChanged(); + super.notifyObservers(arg); + } + }; + } + return observable; + } + + /** + * Switch this over to the next path in the list + */ + private void nextPath() throws IOException { + boolean completed = false; + + if (logFile != null) { + + // If there was a deferred header, write it out now + // + if (deferedUpdateLength != 0) { + logFD.sync(); // force the bytes to disk + logFile.seek(deferedPosition); + writeInt((int)deferedUpdateLength); + } + try { + close(); // close the stream and the file + } catch (IOException ignore) { } // assume this is okay + completed = true; + } + + suffix++; // go to next suffix + logFile = new RandomAccessFile(baseDir.getPath() + File.separator + + baseFile + suffix, "rw"); + logFD = logFile.getFD(); + out = new ObjectOutputStream(new LogOutputStream(logFile)); + + writeInt(LOG_VERSION); + + logBytes = logFile.getFilePointer(); + logFile.setLength(logBytes); + + // always start out with zero length header for the next update + logFile.write(zeroBuf); + + // force length header to disk + logFD.sync(); + + deferedUpdateLength = 0; + opCnt = 0; + + /* + * Tell consumer about the completed log. This is done after the + * new one is created so that the old path can be known not + * to be the newest (because something newer is there). + */ + if (observable != null && completed) + observable.notifyObservers(); + } + + /** + * Close the log, but don't remove it. + */ + synchronized void close() throws IOException { + if (logFile != null) { + try { + out.close(); + logFile.close(); + } finally { + logFile = null; + } + } + } + + /** + * Override destroy so we can try to close logFile before calling + * super tries to delete all the files. + */ + void destroy() { + try { + close(); + } catch (Throwable t) { + // Don't let failure keep us from deleting the files we can + } + super.destroy(); + } + + /** + * Log a server boot. + */ + public synchronized void bootOp(long time, long sessionId) { + try { + out.writeByte(BOOT_OP); + out.writeLong(time); + out.writeLong(sessionId); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a change in join state + */ + public synchronized void joinStateOp(StorableObject state) { + try { + out.writeByte(JOINSTATE_OP); + out.writeObject(new BaseObject(state)); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a <code>write</code> operation. + */ + public synchronized void writeOp(StorableResource entry, Long txnId) { + try { + out.writeByte(WRITE_OP); + out.writeObject(new Resource(entry)); + out.writeObject(txnId); + + // A write operation under a transaction does not need to be + // flushed until it is prepared. + // + flush(txnId == null); + } catch (IOException e) { + failed(e); + } + } + + // Inherit java doc from supertype + public synchronized void writeOp(StorableResource entries[], Long txnId) { + try { + out.writeByte(BATCH_WRITE_OP); + out.writeObject(txnId); + + // In the middle of records we need to use the stream's + // writeInt, not our private one + out.writeInt(entries.length); + for (int i=0; i<entries.length; i++) { + out.writeObject(new Resource(entries[i])); + } + + // A write operation under a transaction does not need to be + // flushed until it is prepared. + // + flush(txnId == null, entries.length); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a <code>take</code> operation. + */ + public synchronized void takeOp(Uuid cookie, Long txnId) { + try { + out.writeByte(TAKE_OP); + cookie.write(out); + out.writeObject(txnId); + + // A take operation under a transaction does not need to be + // flushed until it is prepared. + // + flush(txnId == null); + } catch (IOException e) { + failed(e); + } + } + + // Inherit java doc from supertype + public synchronized void takeOp(Uuid cookies[], Long txnId) { + try { + out.writeByte(BATCH_TAKE_OP); + out.writeObject(txnId); + + // In the middle of records we need to use the stream's + // writeInt, not our private one + out.writeInt(cookies.length); + for (int i=0; i<cookies.length; i++) { + cookies[i].write(out); + } + + // A take operation under a transaction does not need to be + // flushed until it is prepared. + // + flush(txnId == null, cookies.length); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a <code>notify</code> operation. + */ + public synchronized void registerOp(StorableResource registration, + String type, StorableObject[] templates) + { + try { + out.writeByte(REGISTER_OP); + out.writeObject(new Registration(registration, type, templates)); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a <code>renew</code> operation. + */ + public synchronized void renewOp(Uuid cookie, long expiration) { + try { + out.writeByte(RENEW_OP); + cookie.write(out); + out.writeLong(expiration); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a <code>cancel</code> operation. + */ + public synchronized void cancelOp(Uuid cookie, boolean expired) { + try { + out.writeByte(CANCEL_OP); + cookie.write(out); + + // cancels due to expiration don't need to be flushed + // right away + flush(!expired); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a transaction <code>prepare</code> operation. + */ + public synchronized void prepareOp(Long txnId, + StorableObject transaction) { + try { + out.writeByte(PREPARE_OP); + out.writeObject(txnId); + out.writeObject(new BaseObject(transaction)); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a transaction <code>commit</code> operation. + */ + public synchronized void commitOp(Long txnId) { + try { + out.writeByte(COMMIT_OP); + out.writeObject(txnId); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Log a transaction <code>abort</code> operation. + */ + public synchronized void abortOp(Long txnId) { + try { + out.writeByte(ABORT_OP); + out.writeObject(txnId); + flush(); + } catch (IOException e) { + failed(e); + } + } + + public synchronized void uuidOp(Uuid uuid) { + try { + out.writeByte(UUID_OP); + uuid.write(out); + flush(); + } catch (IOException e) { + failed(e); + } + } + + /** + * Flush the current output after an operation. If the number of + * operations is exceeded, shift over to the next path. + */ + private void flush() throws IOException { + flush(true); + } + + + /** + * Conditionally flush the current output. If the number of + * operations is exceeded, shift over to the next path even if + * <code>forceToDisk</code> is <code>false</code>. + */ + private synchronized void flush(boolean forceToDisk) + throws IOException + { + flush(forceToDisk, 1); + } + + /** + * Conditionally flush the current output. If the number of + * operations is exceeded, shift over to the next path even if + * <code>forceToDisk</code> is <code>false</code>. + */ + private synchronized void flush(boolean forceToDisk, + int effectiveOpCount) + throws IOException + { + assert effectiveOpCount > 0; + + out.flush(); + + if (forceToDisk) { + + // must force contents to disk before writing real length header + logFD.sync(); + } + + long entryEnd = logFile.getFilePointer(); + long updateLen = entryEnd - logBytes - intBytes; + + // If we are not forcing to disk, we want to defer the write of the + // first header. This will leave a zero just after the last sync'ed + // record and will assure that LogInputFile will not read a partially + // written record. + // + if (!forceToDisk) { + + // If this is the first flush(false) we save the header information + // and location for later. Otherwise we write out the header + // normally. + // + if (deferedUpdateLength == 0) { + deferedUpdateLength = updateLen; // save the header length + deferedPosition = logBytes; // and position for later + } else { + // write real length header + logFile.seek(logBytes); + writeInt((int)updateLen); + } + } else { + + // If there was a deferred header, write that out now and + // then write the current header. + // + if (deferedUpdateLength != 0) { + logFile.seek(deferedPosition); + writeInt((int)deferedUpdateLength); + deferedUpdateLength = 0; + } + // write real length header + logFile.seek(logBytes); + writeInt((int)updateLen); + } + + // pad out update record so length header does not span disk blocks + entryEnd = (entryEnd + 3) & ~3L; + + // write zero length header for next update + logFile.seek(entryEnd); + logFile.write(zeroBuf); + logBytes = entryEnd; + + if (forceToDisk) + logFD.sync(); + + opCnt += effectiveOpCount; + if (opCnt >= maxOps) + nextPath(); + else + out.reset(); // not critical to flush this + } + + /** + * Write an int value in single write operation. Note we only use + * this method when writing log file and recored headers. We + * can't use it inside records because the data inside records is + * written/read using <code>ObjectIn/OutputStream</code> and this + * method writes directly to the <code>RandomAccessFile</code>. + * + * @param val int value + * @throws IOException if any other I/O error occurs + */ + private void writeInt(int val) throws IOException { + intBuf[0] = (byte) (val >> 24); + intBuf[1] = (byte) (val >> 16); + intBuf[2] = (byte) (val >> 8); + intBuf[3] = (byte) val; + logFile.write(intBuf); + } + + private void failed(Exception e) throws InternalSpaceException { + logger.log(Level.SEVERE, + "Unexpected I/O error while persisting Space data", + e); + System.exit(-5); + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Registration.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Registration.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Registration.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Registration.java Sun Jul 5 11:41:39 2020 @@ -1,50 +1,50 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger.snaplogstore; - -import org.apache.river.outrigger.StorableObject; -import org.apache.river.outrigger.StorableResource; - -/** - * Wrapper for outrigger event registrations. An event registration - * consists of a leased registration and the matching template. - */ -class Registration extends Resource { - static final long serialVersionUID = 2L; - - private final BaseObject[] templates; - - private final String type; - - Registration(StorableResource chit, String type, StorableObject[] ts) { - super(chit); - this.type = type; - templates = new BaseObject[ts.length]; - for (int i=0; i<templates.length; i++) { - templates[i] = new BaseObject(ts[i]); - } - } - - BaseObject[] getTemplates() { - return templates; - } - - String getType() { - return type; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger.snaplogstore; + +import org.apache.river.outrigger.proxy.StorableObject; +import org.apache.river.outrigger.proxy.StorableResource; + +/** + * Wrapper for outrigger event registrations. An event registration + * consists of a leased registration and the matching template. + */ +class Registration extends Resource { + static final long serialVersionUID = 2L; + + private final BaseObject[] templates; + + private final String type; + + Registration(StorableResource chit, String type, StorableObject[] ts) { + super(chit); + this.type = type; + templates = new BaseObject[ts.length]; + for (int i=0; i<templates.length; i++) { + templates[i] = new BaseObject(ts[i]); + } + } + + BaseObject[] getTemplates() { + return templates; + } + + String getType() { + return type; + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Resource.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Resource.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Resource.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-snaplogstore/src/main/java/org/apache/river/outrigger/snaplogstore/Resource.java Sun Jul 5 11:41:39 2020 @@ -1,80 +1,80 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger.snaplogstore; - -import org.apache.river.outrigger.StorableResource; -import org.apache.river.outrigger.StoredResource; -import net.jini.id.Uuid; - -import java.io.IOException; -import java.util.Arrays; - -/** - * Wrapper for outrigger objects that are leased resources. - * This class records renews so that the - * stored resource can be updated while the target is serialized. - * When the stored resource is deserialized the (potentially) - * updated expiration is set in the resource before it is returned. - */ -class Resource extends BaseObject implements StoredResource { - static final long serialVersionUID = -4248052947306243840L; - - private final byte[] cookie; - private volatile long expiration; - - Resource(StorableResource resource) { - super(resource); - final Uuid uuid = resource.getCookie(); - cookie = ByteArrayWrapper.toByteArray(uuid); - expiration = resource.getExpiration(); - } - - ByteArrayWrapper getCookieAsWrapper() { - return new ByteArrayWrapper(cookie); - } - - void setExpiration(long newExpiration) { - expiration = newExpiration; - } - - public void restore(StorableResource obj) - throws IOException, ClassNotFoundException - { - super.restore(obj); - - // Set the objects expiration to be the current one (it may still - // be the original expiration) - // - obj.setExpiration(expiration); - } - - public int hashCode() { - return ByteArrayWrapper.hashFor(cookie); - } - - public boolean equals(Object o) { - if (o == null) - return false; - - if (!(o instanceof Resource)) - return false; - - // Same object if same cookie. - return Arrays.equals(cookie, ((Resource)o).cookie); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger.snaplogstore; + +import org.apache.river.outrigger.proxy.StorableResource; +import org.apache.river.outrigger.StoredResource; +import net.jini.id.Uuid; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Wrapper for outrigger objects that are leased resources. + * This class records renews so that the + * stored resource can be updated while the target is serialized. + * When the stored resource is deserialized the (potentially) + * updated expiration is set in the resource before it is returned. + */ +class Resource extends BaseObject implements StoredResource { + static final long serialVersionUID = -4248052947306243840L; + + private final byte[] cookie; + private volatile long expiration; + + Resource(StorableResource resource) { + super(resource); + final Uuid uuid = resource.getCookie(); + cookie = ByteArrayWrapper.toByteArray(uuid); + expiration = resource.getExpiration(); + } + + ByteArrayWrapper getCookieAsWrapper() { + return new ByteArrayWrapper(cookie); + } + + void setExpiration(long newExpiration) { + expiration = newExpiration; + } + + public void restore(StorableResource obj) + throws IOException, ClassNotFoundException + { + super.restore(obj); + + // Set the objects expiration to be the current one (it may still + // be the original expiration) + // + obj.setExpiration(expiration); + } + + public int hashCode() { + return ByteArrayWrapper.hashFor(cookie); + } + + public boolean equals(Object o) { + if (o == null) + return false; + + if (!(o instanceof Resource)) + return false; + + // Same object if same cookie. + return Arrays.equals(cookie, ((Resource)o).cookie); + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/reggie/reggie-dl/src/main/java/org/apache/river/reggie/proxy/AdminProxy.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/reggie/reggie-dl/src/main/java/org/apache/river/reggie/proxy/AdminProxy.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/reggie/reggie-dl/src/main/java/org/apache/river/reggie/proxy/AdminProxy.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/reggie/reggie-dl/src/main/java/org/apache/river/reggie/proxy/AdminProxy.java Sun Jul 5 11:41:39 2020 @@ -1,238 +1,238 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.reggie; - -import org.apache.river.admin.DestroyAdmin; -import java.io.IOException; -import java.io.InvalidObjectException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamException; -import java.io.Serializable; -import java.rmi.RemoteException; -import net.jini.admin.JoinAdmin; -import net.jini.core.constraint.RemoteMethodControl; -import net.jini.core.discovery.LookupLocator; -import net.jini.core.entry.Entry; -import net.jini.core.lookup.ServiceID; -import net.jini.id.ReferentUuid; -import net.jini.id.ReferentUuids; -import net.jini.id.Uuid; -import net.jini.id.UuidFactory; -import net.jini.lookup.DiscoveryAdmin; - -/** - * Proxy for administering a registrar, returned from the getAdmin method of - * the main registrar proxy. Clients only see instances via the - * DiscoveryAdmin, JoinAdmin, DestroyAdmin and ReferentUuid interfaces. - * - * @author Sun Microsystems, Inc. - * - */ -class AdminProxy - implements DiscoveryAdmin, JoinAdmin, DestroyAdmin, - ReferentUuid, Serializable -{ - private static final long serialVersionUID = 2L; - - /** - * The registrar. - * - * @serial - */ - final Registrar server; - /** - * The registrar's service ID. - */ - transient ServiceID registrarID; - - /** - * Returns AdminProxy or ConstrainableAdminProxy instance, depending on - * whether given server implements RemoteMethodControl. - */ - static AdminProxy getInstance(Registrar server, ServiceID registrarID) { - return (server instanceof RemoteMethodControl) ? - new ConstrainableAdminProxy(server, registrarID, null) : - new AdminProxy(server, registrarID); - } - - /** Constructor for use by getInstance(), ConstrainableAdminProxy. */ - AdminProxy(Registrar server, ServiceID registrarID) { - this.server = server; - this.registrarID = registrarID; - } - - // This method's javadoc is inherited from an interface of this class - public Entry[] getLookupAttributes() throws RemoteException { - return server.getLookupAttributes(); - } - - // This method's javadoc is inherited from an interface of this class - public void addLookupAttributes(Entry[] attrSets) throws RemoteException { - server.addLookupAttributes(attrSets); - } - - // This method's javadoc is inherited from an interface of this class - public void modifyLookupAttributes(Entry[] attrSetTemplates, - Entry[] attrSets) - throws RemoteException - { - server.modifyLookupAttributes(attrSetTemplates, attrSets); - } - - // This method's javadoc is inherited from an interface of this class - public String[] getLookupGroups() throws RemoteException { - return server.getLookupGroups(); - } - - // This method's javadoc is inherited from an interface of this class - public void addLookupGroups(String[] groups) throws RemoteException { - server.addLookupGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public void removeLookupGroups(String[] groups) throws RemoteException { - server.removeLookupGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public void setLookupGroups(String[] groups) throws RemoteException { - server.setLookupGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public LookupLocator[] getLookupLocators() throws RemoteException { - return server.getLookupLocators(); - } - - // This method's javadoc is inherited from an interface of this class - public void addLookupLocators(LookupLocator[] locators) - throws RemoteException - { - server.addLookupLocators(locators); - } - - // This method's javadoc is inherited from an interface of this class - public void removeLookupLocators(LookupLocator[] locators) - throws RemoteException - { - server.removeLookupLocators(locators); - } - - // This method's javadoc is inherited from an interface of this class - public void setLookupLocators(LookupLocator[] locators) - throws RemoteException - { - server.setLookupLocators(locators); - } - - // This method's javadoc is inherited from an interface of this class - public void addMemberGroups(String[] groups) throws RemoteException { - server.addMemberGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public void removeMemberGroups(String[] groups) throws RemoteException { - server.removeMemberGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public String[] getMemberGroups() throws RemoteException { - return server.getMemberGroups(); - } - - // This method's javadoc is inherited from an interface of this class - public void setMemberGroups(String[] groups) throws RemoteException { - server.setMemberGroups(groups); - } - - // This method's javadoc is inherited from an interface of this class - public int getUnicastPort() throws RemoteException { - return server.getUnicastPort(); - } - - // This method's javadoc is inherited from an interface of this class - public void setUnicastPort(int port) throws IOException, RemoteException { - server.setUnicastPort(port); - } - - // This method's javadoc is inherited from an interface of this class - public void destroy() throws RemoteException { - server.destroy(); - } - - // This method's javadoc is inherited from an interface of this class - public Uuid getReferentUuid() { - return UuidFactory.create(registrarID.getMostSignificantBits(), - registrarID.getLeastSignificantBits()); - } - - /** Returns service ID hash code. */ - public int hashCode() { - return registrarID.hashCode(); - } - - /** Proxies for servers with the same service ID are considered equal. */ - public boolean equals(Object obj) { - return ReferentUuids.compare(this, obj); - } - - /** - * Returns a string created from the proxy class name, the registrar's - * service ID, and the result of the underlying proxy's toString method. - * - * @return String - */ - public String toString() { - return getClass().getName() + "[registrar=" + registrarID - + " " + server + "]"; - } - - /** - * Writes the default serializable field value for this instance, followed - * by the registrar's service ID encoded as specified by the - * ServiceID.writeBytes method. - */ - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - registrarID.writeBytes(out); - } - - /** - * Reads the default serializable field value for this instance, followed - * by the registrar's service ID encoded as specified by the - * ServiceID.writeBytes method. Verifies that the deserialized registrar - * reference is non-null. - */ - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException - { - in.defaultReadObject(); - registrarID = new ServiceID(in); - if (server == null) { - throw new InvalidObjectException("null server"); - } - } - - /** - * Throws InvalidObjectException, since data for this class is required. - */ - private void readObjectNoData() throws ObjectStreamException { - throw new InvalidObjectException("no data"); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.reggie.proxy; + +import org.apache.river.admin.DestroyAdmin; +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.rmi.RemoteException; +import net.jini.admin.JoinAdmin; +import net.jini.core.constraint.RemoteMethodControl; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.core.lookup.ServiceID; +import net.jini.id.ReferentUuid; +import net.jini.id.ReferentUuids; +import net.jini.id.Uuid; +import net.jini.id.UuidFactory; +import net.jini.lookup.DiscoveryAdmin; + +/** + * Proxy for administering a registrar, returned from the getAdmin method of + * the main registrar proxy. Clients only see instances via the + * DiscoveryAdmin, JoinAdmin, DestroyAdmin and ReferentUuid interfaces. + * + * @author Sun Microsystems, Inc. + * + */ +public class AdminProxy + implements DiscoveryAdmin, JoinAdmin, DestroyAdmin, + ReferentUuid, Serializable +{ + private static final long serialVersionUID = 2L; + + /** + * The registrar. + * + * @serial + */ + final Registrar server; + /** + * The registrar's service ID. + */ + transient ServiceID registrarID; + + /** + * Returns AdminProxy or ConstrainableAdminProxy instance, depending on + * whether given server implements RemoteMethodControl. + */ + public static AdminProxy getInstance(Registrar server, ServiceID registrarID) { + return (server instanceof RemoteMethodControl) ? + new ConstrainableAdminProxy(server, registrarID, null) : + new AdminProxy(server, registrarID); + } + + /** Constructor for use by getInstance(), ConstrainableAdminProxy. */ + AdminProxy(Registrar server, ServiceID registrarID) { + this.server = server; + this.registrarID = registrarID; + } + + // This method's javadoc is inherited from an interface of this class + public Entry[] getLookupAttributes() throws RemoteException { + return server.getLookupAttributes(); + } + + // This method's javadoc is inherited from an interface of this class + public void addLookupAttributes(Entry[] attrSets) throws RemoteException { + server.addLookupAttributes(attrSets); + } + + // This method's javadoc is inherited from an interface of this class + public void modifyLookupAttributes(Entry[] attrSetTemplates, + Entry[] attrSets) + throws RemoteException + { + server.modifyLookupAttributes(attrSetTemplates, attrSets); + } + + // This method's javadoc is inherited from an interface of this class + public String[] getLookupGroups() throws RemoteException { + return server.getLookupGroups(); + } + + // This method's javadoc is inherited from an interface of this class + public void addLookupGroups(String[] groups) throws RemoteException { + server.addLookupGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public void removeLookupGroups(String[] groups) throws RemoteException { + server.removeLookupGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public void setLookupGroups(String[] groups) throws RemoteException { + server.setLookupGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public LookupLocator[] getLookupLocators() throws RemoteException { + return server.getLookupLocators(); + } + + // This method's javadoc is inherited from an interface of this class + public void addLookupLocators(LookupLocator[] locators) + throws RemoteException + { + server.addLookupLocators(locators); + } + + // This method's javadoc is inherited from an interface of this class + public void removeLookupLocators(LookupLocator[] locators) + throws RemoteException + { + server.removeLookupLocators(locators); + } + + // This method's javadoc is inherited from an interface of this class + public void setLookupLocators(LookupLocator[] locators) + throws RemoteException + { + server.setLookupLocators(locators); + } + + // This method's javadoc is inherited from an interface of this class + public void addMemberGroups(String[] groups) throws RemoteException { + server.addMemberGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public void removeMemberGroups(String[] groups) throws RemoteException { + server.removeMemberGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public String[] getMemberGroups() throws RemoteException { + return server.getMemberGroups(); + } + + // This method's javadoc is inherited from an interface of this class + public void setMemberGroups(String[] groups) throws RemoteException { + server.setMemberGroups(groups); + } + + // This method's javadoc is inherited from an interface of this class + public int getUnicastPort() throws RemoteException { + return server.getUnicastPort(); + } + + // This method's javadoc is inherited from an interface of this class + public void setUnicastPort(int port) throws IOException, RemoteException { + server.setUnicastPort(port); + } + + // This method's javadoc is inherited from an interface of this class + public void destroy() throws RemoteException { + server.destroy(); + } + + // This method's javadoc is inherited from an interface of this class + public Uuid getReferentUuid() { + return UuidFactory.create(registrarID.getMostSignificantBits(), + registrarID.getLeastSignificantBits()); + } + + /** Returns service ID hash code. */ + public int hashCode() { + return registrarID.hashCode(); + } + + /** Proxies for servers with the same service ID are considered equal. */ + public boolean equals(Object obj) { + return ReferentUuids.compare(this, obj); + } + + /** + * Returns a string created from the proxy class name, the registrar's + * service ID, and the result of the underlying proxy's toString method. + * + * @return String + */ + public String toString() { + return getClass().getName() + "[registrar=" + registrarID + + " " + server + "]"; + } + + /** + * Writes the default serializable field value for this instance, followed + * by the registrar's service ID encoded as specified by the + * ServiceID.writeBytes method. + */ + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + registrarID.writeBytes(out); + } + + /** + * Reads the default serializable field value for this instance, followed + * by the registrar's service ID encoded as specified by the + * ServiceID.writeBytes method. Verifies that the deserialized registrar + * reference is non-null. + */ + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException + { + in.defaultReadObject(); + registrarID = new ServiceID(in); + if (server == null) { + throw new InvalidObjectException("null server"); + } + } + + /** + * Throws InvalidObjectException, since data for this class is required. + */ + private void readObjectNoData() throws ObjectStreamException { + throw new InvalidObjectException("no data"); + } +}
