Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java
 Sun Jul  5 11:41:39 2020
@@ -1,639 +1,641 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.rmi.MarshalledObject;
-import java.rmi.RemoteException;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.discovery.LookupLocator;
-import net.jini.core.entry.Entry;
-import net.jini.core.lookup.ServiceID;
-import net.jini.discovery.DiscoveryManagement;
-import net.jini.discovery.DiscoveryLocatorManagement;
-import net.jini.discovery.DiscoveryGroupManagement;
-import net.jini.discovery.LookupDiscoveryManager;
-import net.jini.lookup.JoinManager;
-
-import net.jini.config.Configuration;
-import net.jini.config.ConfigurationException;
-import net.jini.security.ProxyPreparer;
-
-import org.apache.river.config.Config;
-import org.apache.river.logging.Levels;
-import net.jini.io.MarshalledInstance;
-
-/**
- * <code>JoinStateManager</code> provides a utility that manages
- * a service's join state (optionally persisting that state) and
- * manages the join protocol protocol on behalf of the service.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see JoinManager
- */
-// @see JoinAdminState
-class JoinStateManager implements StorableObject<JoinStateManager> {
-    /** <code>ProxyPreparer</code> for <code>LookupLocators</code> */
-    private volatile ProxyPreparer lookupLocatorPreparer;
-
-    /**
-     * Object used to find lookups. Has to implement DiscoveryManagement
-     * and DiscoveryLocatorManagement as well as DiscoveryGroupManagement.
-     */
-    private volatile DiscoveryGroupManagement dgm;
-
-    /**
-     * <code>JoinManager</code> that is handling the details of binding
-     * into Jini lookup services.
-     */
-    private volatile JoinManager  mgr;
-
-    /**
-     * The object that is coordinating our persistent state.
-     */
-    private volatile LogOps log;
-
-    /**
-     * The list of attributes to start with. This field is only used
-     * to carry data from the <code>restore</code> method to the
-     * <code>startManager</code> method. The current set of attributes
-     * is kept by <code>mgr</code>. This field is nulled out by
-     * <code>startManager</code>.
-     */
-    private volatile Entry[]           attributes;
-
-    /**
-     * The list of <code>LookupLocator</code>s to start with. This
-     * field is only used to carry data from the <code>restore</code>
-     * method to the <code>startManager</code> method. The current set
-     * of attributes is kept by <code>mgr</code>. This field is nulled
-     * out by <code>startManager</code>.  
-     */
-    private volatile LookupLocator     locators[];
-
-
-    /**
-     * The list of group names to start with. This field is only used
-     * to carry data from the <code>restore</code> method to the
-     * <code>startManager</code> method. The current set of attributes
-     * is kept by <code>mgr</code>. This field is nulled out by
-     * <code>startManager</code>.
-     */
-    private volatile String            groups[];
-
-    /**
-     * Conceptually, true if this is the first time this
-     * service has come up, implemented as if there was
-     * no previous state then this is the first time.
-     */
-    private volatile boolean initial = true;
-
-    /** Logger for logging join related information */
-    private static final Logger logger = 
-       Logger.getLogger(OutriggerServerImpl.joinLoggerName);
-    
-    /**
-     * Simple constructor.
-     */
-    JoinStateManager() { }
-
-    /**
-     * Start the manager. Start looking for lookup and registering
-     * with them.
-     * @param config object to use to obtain
-     *               <code>DiscoveryManagement</code> object, and if
-     *               this is the initial incarnation of this service, 
-     *               the object used to get the initial set of groups,
-     *               locators, and deployer defined attributes.
-     * @param log object used to persist the manager's state, may be
-     *            <code>null</code>.
-     * @param serviceID The <code>ServiceID</code> to register
-     *                  under.
-     * @param service The proxy object to register with lookups.
-     * @param baseAttributes Any attributes the implementation wants
-     *                       attached, only used if this is the
-     *                       initial incarnation.
-     * @throws IOException if the is problem persisting the 
-     *         initial state or in starting discovery.
-     * @throws ConfigurationException if the configuration
-     *         is invalid.
-     * @throws NullPointerException if <code>config</code>, 
-     *         or <code>serviceID</code> is  <code>null</code>.
-     */
-    void startManager(Configuration config, LogOps log, Object service,
-                     ServiceID serviceID, Entry[] baseAttributes) 
-       throws IOException, ConfigurationException
-    {
-       // Default do nothing preparer
-       final ProxyPreparer defaultPreparer = 
-           new net.jini.security.BasicProxyPreparer();
-
-       if (serviceID == null)
-           throw new NullPointerException("serviceID can't be null");
-
-       this.log = log;
-
-       lookupLocatorPreparer = 
-           (ProxyPreparer)Config.getNonNullEntry(config, 
-               OutriggerServerImpl.COMPONENT_NAME, "lookupLocatorPreparer",
-               ProxyPreparer.class, defaultPreparer);
-
-       dgm = (DiscoveryGroupManagement)
-           Config.getNonNullEntry(config, 
-               OutriggerServerImpl.COMPONENT_NAME, "discoveryManager",
-               DiscoveryGroupManagement.class, 
-               new LookupDiscoveryManager(
-                    DiscoveryGroupManagement.NO_GROUPS, null, null,
-                   config));
-
-       if (!(dgm instanceof DiscoveryManagement))
-           throw throwNewConfigurationException("Entry for component " +
-               OutriggerServerImpl.COMPONENT_NAME + ", name " +
-               "discoveryManager must implement " +
-               "net.jini.discovery.DiscoveryGroupManagement");
-
-       if (!(dgm instanceof DiscoveryLocatorManagement))
-           throw throwNewConfigurationException("Entry for component " +
-               OutriggerServerImpl.COMPONENT_NAME + ", name " +
-               "discoveryManager must implement " +
-               "net.jini.discovery.DiscoveryLocatorManagement");
-
-       final String[] toCheck = dgm.getGroups();
-       if (toCheck == null || toCheck.length != 0)
-           throw throwNewConfigurationException("Entry for component " +
-               OutriggerServerImpl.COMPONENT_NAME + ", name " +
-               "discoveryManager must be initially configured with no " +
-                "groups");
-
-       if (((DiscoveryLocatorManagement)dgm).getLocators().length != 0)
-           throw throwNewConfigurationException("Entry for component " +
-                OutriggerServerImpl.COMPONENT_NAME + ", name " +
-               "discoveryManager must be initially configured with no " +
-               "locators");
-
-       // if this is the first incarnation, consult config for groups,
-       // locators and attributes.
-       if (initial) {
-           groups = (String[])
-               config.getEntry(OutriggerServerImpl.COMPONENT_NAME,
-                   "initialLookupGroups", String[].class,
-                   new String[]{""});
-
-           locators = (LookupLocator[])
-               Config.getNonNullEntry(config, 
-                   OutriggerServerImpl.COMPONENT_NAME,
-                    "initialLookupLocators", LookupLocator[].class, 
-                    new LookupLocator[0]);
-
-           final Entry[] cAttrs = (Entry[])
-               Config.getNonNullEntry(config, 
-                    OutriggerServerImpl.COMPONENT_NAME,
-                   "initialLookupAttributes", Entry[].class, new Entry[0]);
-
-           if (cAttrs.length == 0) {
-               attributes = baseAttributes;
-           } else {
-               attributes = 
-                   new Entry[cAttrs.length + baseAttributes.length];
-               System.arraycopy(baseAttributes, 0, attributes, 
-                                0, baseAttributes.length);
-               System.arraycopy(cAttrs, 0, attributes, 
-                                baseAttributes.length, cAttrs.length);
-           }
-       } else {
-           /* recovery : if there are any locators get and
-            * use recoveredLookupLocatorPreparer
-            */
-           if (locators.length > 0) {
-               final ProxyPreparer recoveredLookupLocatorPreparer = 
-                   (ProxyPreparer)Config.getNonNullEntry(config, 
-                       OutriggerServerImpl.COMPONENT_NAME,
-                       "recoveredLookupLocatorPreparer", ProxyPreparer.class,
-                        defaultPreparer);
-
-               final List prepared = new java.util.LinkedList();
-               for (int i=0; i<locators.length; i++) {
-                   final LookupLocator locator = locators[i];
-                   try {
-                       prepared.add(recoveredLookupLocatorPreparer.
-                                    prepareProxy(locator));
-                   } catch (Throwable t) {
-                       logger.log(Level.INFO,
-                           "Encountered exception preparing lookup locator " +
-                           "for " + locator + ", dropping locator", t);
-                   }
-               }
-
-               locators = 
-                   (LookupLocator[])prepared.toArray(new LookupLocator[0]);
-           }
-       }
-
-       // Now that we have groups & locators (either from 
-       // a previous incarnation or from the config) start discovery.
-       if (logger.isLoggable(Level.CONFIG)) {
-           if (groups == null) {
-               logger.log(Level.CONFIG, "joining all groups");
-           } else if (groups.length == 0) {
-               logger.log(Level.CONFIG, "joining no groups");  
-           } else {
-               final StringBuffer buf = new StringBuffer();
-               buf.append("joining groups:");
-               for (int i=0; i<groups.length; i++) {
-                   if (i != 0)
-                       buf.append(",");
-
-                   buf.append("\"");
-                   buf.append(groups[i]);
-                   buf.append("\"");
-               }
-               logger.log(Level.CONFIG, buf.toString());
-           }
-
-           if (locators.length == 0) {
-               logger.log(Level.CONFIG, "joining no specific registrars");
-           } else {
-               final StringBuffer buf = new StringBuffer();
-               buf.append("joining the specific registrars:");
-               for (int i=0; i<locators.length; i++) {
-                   if (i != 0)
-                       buf.append(", ");
-
-                   buf.append(locators[i]);
-               }
-               logger.log(Level.CONFIG, buf.toString());
-           }
-
-           if (attributes.length == 0) {
-               logger.log(Level.CONFIG, "registering no attributes");
-           } else {
-               final StringBuffer buf = new StringBuffer();
-               buf.append("registering the attributes:");
-               for (int i=0; i<attributes.length; i++) {
-                   if (i != 0)
-                       buf.append(", ");
-
-                   buf.append(attributes[i]);
-               }
-               logger.log(Level.CONFIG, buf.toString());               
-           }
-       }
-
-       dgm.setGroups(groups);
-       ((DiscoveryLocatorManagement)dgm).setLocators(locators);
-
-       mgr = new JoinManager(service, attributes, serviceID, 
-                             (DiscoveryManagement)dgm, null, config);
-
-       // Once we are running we don't need the attributes,
-       // locators, and groups fields, null them out (the
-       // state is in the mgr and dgm.
-       attributes = null;
-       groups = null;
-       locators = null;
-
-       // Now that we have state, make sure it is written to disk.
-       update();
-    }
-
-    /** 
-     * Make a good faith attempt to terminate
-     * discovery, and cancel any lookup registrations.  */
-    public void destroy() {
-       // Unregister with lookup
-
-       // Terminate the JoinManager first so it will not call
-       // into the dgm after it has been terminated.
-       if (mgr != null)
-           mgr.terminate();
-
-       if (dgm != null) 
-           ((DiscoveryManagement)dgm).terminate();
-    }
-
-    /* Basically we are implementing JoinAdmin, for get methods we just
-     * delegate to JoinManager, for the set methods we call
-     * JoinManager to and then persist the change by calling the
-     * appropriate method on our JoinAdminState.  If the call on our
-     * JoinAdminState throws an IOException we throw a runtime
-     * exception since JoinAdmin methods don't let us throw a
-     * IOException 
-     */
-
-    /** 
-     * Get the current attribute sets for the service. 
-     * 
-     * @return the current attribute sets for the service
-     */
-    public Entry[] getLookupAttributes() {
-       return mgr.getAttributes();
-    }
-
-    /** 
-     * Add attribute sets for the service.  The resulting set will be used
-     * for all future joins.  The attribute sets are also added to all 
-     * currently-joined lookup services.
-     *
-     * @param attrSets the attribute sets to add
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     */
-    public void addLookupAttributes(Entry[] attrSets) {
-       mgr.addAttributes(attrSets, true);
-       update();
-    }
-
-    /**  
-     * Modify the current attribute sets, using the same semantics as
-     * ServiceRegistration.modifyAttributes.  The resulting set will be used
-     * for all future joins.  The same modifications are also made to all 
-     * currently-joined lookup services.
-     *
-     * @param attrSetTemplates the templates for matching attribute sets
-     * @param attrSets the modifications to make to matching sets
-     *     
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see net.jini.core.lookup.ServiceRegistration#modifyAttributes
-     */
-    public void modifyLookupAttributes(Entry[] attrSetTemplates,
-                                      Entry[] attrSets) {
-       mgr.modifyAttributes(attrSetTemplates, attrSets, true);
-       update();
-    }
-
-   /**
-     * Get the list of groups to join.  An empty array means the service
-     * joins no groups (as opposed to "all" groups).
-     *
-     * @return an array of groups to join. An empty array means the service
-     *         joins no groups (as opposed to "all" groups).
-     * @see #setLookupGroups
-     */
-    public String[] getLookupGroups() {
-       return dgm.getGroups();
-    }
-
-    /**
-     * Add new groups to the set to join.  Lookup services in the new
-     * groups will be discovered and joined.
-     *
-     * @param groups groups to join
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #removeLookupGroups
-     */
-    public void addLookupGroups(String[] groups) {
-       try {
-           dgm.addGroups(groups);
-       } catch (IOException e) {
-           throw propagateIOException("Could not change groups", e);
-       }
-       update();
-    }
-
-    /**
-     * Remove groups from the set to join.  Leases are cancelled at lookup
-     * services that are not members of any of the remaining groups.
-     *
-     * @param groups groups to leave
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #addLookupGroups
-     */
-    public void removeLookupGroups(String[] groups) {
-       dgm.removeGroups(groups);
-       update();
-    }
-
-    /**
-     * Replace the list of groups to join with a new list.  Leases are
-     * cancelled at lookup services that are not members of any of the
-     * new groups.  Lookup services in the new groups will be discovered
-     * and joined.
-     *
-     * @param groups groups to join
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #getLookupGroups
-     */
-    public void setLookupGroups(String[] groups) {
-       try {
-           dgm.setGroups(groups);
-       } catch (IOException e) {
-           throw propagateIOException("Could not change groups", e);
-       }
-       update();
-    }
-    
-    /** 
-     * Get the list of locators of specific lookup services to join. 
-     *
-     * @return the list of locators of specific lookup services to join
-     * @see #setLookupLocators
-     */
-    public LookupLocator[] getLookupLocators() {
-       return ((DiscoveryLocatorManagement)dgm).getLocators();
-    }
-
-    /**
-     * Add locators for specific new lookup services to join.  The new
-     * lookup services will be discovered and joined.
-     *
-     * @param locators locators of specific lookup services to join
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #removeLookupLocators
-     */
-    public void addLookupLocators(LookupLocator[] locators) 
-        throws RemoteException
-    {
-       prepareLocators(locators);
-       ((DiscoveryLocatorManagement)dgm).addLocators(locators);
-       update();
-    }
-
-    /**
-     * Remove locators for specific lookup services from the set to join.
-     * Any leases held at the lookup services are cancelled.
-     *
-     * @param locators locators of specific lookup services to leave
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #addLookupLocators
-     */
-    public void removeLookupLocators(LookupLocator[] locators) 
-        throws RemoteException
-    {
-       prepareLocators(locators);
-       ((DiscoveryLocatorManagement)dgm).removeLocators(locators);
-       update();
-    }
-
-    /**
-     * Replace the list of locators of specific lookup services to join
-     * with a new list.  Leases are cancelled at lookup services that were
-     * in the old list but are not in the new list.  Any new lookup services
-     * will be discovered and joined.
-     *
-     * @param locators locators of specific lookup services to join
-     * @throws java.rmi.RuntimeException if the change can not be persisted.
-     * @see #getLookupLocators
-     */
-    public void setLookupLocators(LookupLocator[] locators) 
-        throws RemoteException
-    {
-       prepareLocators(locators);
-       ((DiscoveryLocatorManagement)dgm).setLocators(locators);
-       update();
-    }
-
-    private void update() {
-       if (log != null)
-           log.joinStateOp(this);
-    }
-
-    /**
-     * Apply <code>lookupLocatorPreparer</code> to each locator in the
-     * array, replacing the original locator with the result of the
-     * <code>prepareProxy</code> call. If call fails with an exception
-     * throw that exception.  
-     * @param locators the <code>LookupLocator</code>s to be prepared.
-     * @throws RemoteException if preparation of any of the locators
-     *         does.
-     * @throws SecurityException if preparation of any of the locators
-     *         does.
-     */
-    private void prepareLocators(LookupLocator[] locators) 
-        throws RemoteException 
-    {
-       for (int i = 0; i<locators.length; i++) 
-           locators[i] = (LookupLocator)lookupLocatorPreparer.prepareProxy(
-               locators[i]);
-    }
-
-    /**
-     * Utility method to write out an array of entities to an
-     * <code>ObjectOutputStream</code>.  Can be recovered by a call
-     * to <code>readAttributes()</code>
-     * <p>
-     * Packages each attribute in its own <code>MarshalledObject</code> so
-     * a bad codebase on an attribute class will not corrupt the whole array.
-     */
-    // @see JoinAdminActivationState#readAttributes
-    static private void writeAttributes(Entry[] attributes,
-                                        ObjectOutputStream out)
-        throws IOException
-    {
-        // Need to package each attribute in its own marshaled object,
-        // this makes sure that the attribute's code base is preserved
-        // and when we unpack to discard attributes who's codebase
-        // has been lost without throwing away those we can still deal with.
-         
-        out.writeInt(attributes.length);
-        for (int i=0; i<attributes.length; i++) {
-            out.writeObject(
-                new 
MarshalledInstance(attributes[i]).convertToMarshalledObject());
-       }
-    }
- 
-    /**
-     * Utility method to read in an array of entities from a
-     * <code>ObjectInputStream</code>.  Array should have been written
-     * by a call to <code>writeAttributes()</code>
-     * <p>
-     *   
-     * Will try and recover as many attributes as possible.
-     * Attributes which can't be recovered won't be returned but they
-     * will remain in the log.
-     */
-    // @see JoinAdminActivationState#writeAttributes
-    static private Entry[] readAttributes(ObjectInputStream in)
-        throws IOException, ClassNotFoundException
-    {
-        final List entries = new java.util.LinkedList();
-        final int objectCount = in.readInt();
-        for (int i=0; i<objectCount; i++) {
-            try {
-                MarshalledObject mo = (MarshalledObject)in.readObject();
-                entries.add( new MarshalledInstance(mo).get(false));
-            } catch (IOException e) {
-               logger.log(Level.INFO, "Encountered IOException recovering " +
-                    "attribute, dropping attribute", e);
-            } catch (ClassNotFoundException e) {
-               logger.log(Level.INFO, "Encountered ClassNotFoundException " +
-                   "recovering attribute, dropping attribute", e);
-            }
-        }
- 
-        return (Entry[])entries.toArray(new Entry[0]);
-    }
-
-    // -----------------------------------
-    //  Methods required by StorableObject
-    // -----------------------------------
- 
-    // inherit doc comment
-    public void store(ObjectOutputStream out) throws IOException {
-        writeAttributes(mgr.getAttributes(), out);
-        out.writeObject(((DiscoveryLocatorManagement)dgm).getLocators());
-        out.writeObject(dgm.getGroups());
-    }
-
-    // inherit doc comment
-    public JoinStateManager restore(ObjectInputStream in) 
-       throws IOException, ClassNotFoundException 
-    {
-       initial = false;
-        attributes = readAttributes(in);
-        locators   = (LookupLocator [])in.readObject();
-        groups     = (String [])in.readObject();
-        return this;
-    }
-
-    /**
-     * Construct, log, and throw a new ConfigurationException with
-     * the given message.
-     */
-    private static ConfigurationException throwNewConfigurationException(
-            String msg) 
-       throws ConfigurationException
-    {
-       final ConfigurationException e = new ConfigurationException(msg);
-
-       if (logger.isLoggable(Levels.FAILED)) {
-           logger.log(Levels.FAILED, msg, e);
-       }
-
-       throw e;
-    }  
-
-    /** 
-     * Propagate an IOException by wrapping it in a RuntimeException.
-     * Performs appropriate logging.
-     */
-    private static RuntimeException propagateIOException(String msg,
-                                                        IOException nested)
-    {
-       final RuntimeException e = new RuntimeException(msg, nested);
-       if (logger.isLoggable(Levels.FAILED)) {
-           logger.log(Levels.FAILED, msg, e);
-       }
-
-       throw e;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.discovery.LookupLocator;
+import net.jini.core.entry.Entry;
+import net.jini.core.lookup.ServiceID;
+import net.jini.discovery.DiscoveryManagement;
+import net.jini.discovery.DiscoveryLocatorManagement;
+import net.jini.discovery.DiscoveryGroupManagement;
+import net.jini.discovery.LookupDiscoveryManager;
+import net.jini.lookup.JoinManager;
+import org.apache.river.outrigger.proxy.StorableObject;
+
+
+import net.jini.config.Configuration;
+import net.jini.config.ConfigurationException;
+import net.jini.security.ProxyPreparer;
+
+import org.apache.river.config.Config;
+import org.apache.river.logging.Levels;
+import net.jini.io.MarshalledInstance;
+
+/**
+ * <code>JoinStateManager</code> provides a utility that manages
+ * a service's join state (optionally persisting that state) and
+ * manages the join protocol protocol on behalf of the service.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see JoinManager
+ */
+// @see JoinAdminState
+class JoinStateManager implements StorableObject<JoinStateManager> {
+    /** <code>ProxyPreparer</code> for <code>LookupLocators</code> */
+    private volatile ProxyPreparer lookupLocatorPreparer;
+
+    /**
+     * Object used to find lookups. Has to implement DiscoveryManagement
+     * and DiscoveryLocatorManagement as well as DiscoveryGroupManagement.
+     */
+    private volatile DiscoveryGroupManagement dgm;
+
+    /**
+     * <code>JoinManager</code> that is handling the details of binding
+     * into Jini lookup services.
+     */
+    private volatile JoinManager  mgr;
+
+    /**
+     * The object that is coordinating our persistent state.
+     */
+    private volatile LogOps log;
+
+    /**
+     * The list of attributes to start with. This field is only used
+     * to carry data from the <code>restore</code> method to the
+     * <code>startManager</code> method. The current set of attributes
+     * is kept by <code>mgr</code>. This field is nulled out by
+     * <code>startManager</code>.
+     */
+    private volatile Entry[]           attributes;
+
+    /**
+     * The list of <code>LookupLocator</code>s to start with. This
+     * field is only used to carry data from the <code>restore</code>
+     * method to the <code>startManager</code> method. The current set
+     * of attributes is kept by <code>mgr</code>. This field is nulled
+     * out by <code>startManager</code>.  
+     */
+    private volatile LookupLocator     locators[];
+
+
+    /**
+     * The list of group names to start with. This field is only used
+     * to carry data from the <code>restore</code> method to the
+     * <code>startManager</code> method. The current set of attributes
+     * is kept by <code>mgr</code>. This field is nulled out by
+     * <code>startManager</code>.
+     */
+    private volatile String            groups[];
+
+    /**
+     * Conceptually, true if this is the first time this
+     * service has come up, implemented as if there was
+     * no previous state then this is the first time.
+     */
+    private volatile boolean initial = true;
+
+    /** Logger for logging join related information */
+    private static final Logger logger = 
+       Logger.getLogger(OutriggerServerImpl.joinLoggerName);
+    
+    /**
+     * Simple constructor.
+     */
+    JoinStateManager() { }
+
+    /**
+     * Start the manager. Start looking for lookup and registering
+     * with them.
+     * @param config object to use to obtain
+     *               <code>DiscoveryManagement</code> object, and if
+     *               this is the initial incarnation of this service, 
+     *               the object used to get the initial set of groups,
+     *               locators, and deployer defined attributes.
+     * @param log object used to persist the manager's state, may be
+     *            <code>null</code>.
+     * @param serviceID The <code>ServiceID</code> to register
+     *                  under.
+     * @param service The proxy object to register with lookups.
+     * @param baseAttributes Any attributes the implementation wants
+     *                       attached, only used if this is the
+     *                       initial incarnation.
+     * @throws IOException if the is problem persisting the 
+     *         initial state or in starting discovery.
+     * @throws ConfigurationException if the configuration
+     *         is invalid.
+     * @throws NullPointerException if <code>config</code>, 
+     *         or <code>serviceID</code> is  <code>null</code>.
+     */
+    void startManager(Configuration config, LogOps log, Object service,
+                     ServiceID serviceID, Entry[] baseAttributes) 
+       throws IOException, ConfigurationException
+    {
+       // Default do nothing preparer
+       final ProxyPreparer defaultPreparer = 
+           new net.jini.security.BasicProxyPreparer();
+
+       if (serviceID == null)
+           throw new NullPointerException("serviceID can't be null");
+
+       this.log = log;
+
+       lookupLocatorPreparer = 
+           (ProxyPreparer)Config.getNonNullEntry(config, 
+               OutriggerServerImpl.COMPONENT_NAME, "lookupLocatorPreparer",
+               ProxyPreparer.class, defaultPreparer);
+
+       dgm = (DiscoveryGroupManagement)
+           Config.getNonNullEntry(config, 
+               OutriggerServerImpl.COMPONENT_NAME, "discoveryManager",
+               DiscoveryGroupManagement.class, 
+               new LookupDiscoveryManager(
+                    DiscoveryGroupManagement.NO_GROUPS, null, null,
+                   config));
+
+       if (!(dgm instanceof DiscoveryManagement))
+           throw throwNewConfigurationException("Entry for component " +
+               OutriggerServerImpl.COMPONENT_NAME + ", name " +
+               "discoveryManager must implement " +
+               "net.jini.discovery.DiscoveryGroupManagement");
+
+       if (!(dgm instanceof DiscoveryLocatorManagement))
+           throw throwNewConfigurationException("Entry for component " +
+               OutriggerServerImpl.COMPONENT_NAME + ", name " +
+               "discoveryManager must implement " +
+               "net.jini.discovery.DiscoveryLocatorManagement");
+
+       final String[] toCheck = dgm.getGroups();
+       if (toCheck == null || toCheck.length != 0)
+           throw throwNewConfigurationException("Entry for component " +
+               OutriggerServerImpl.COMPONENT_NAME + ", name " +
+               "discoveryManager must be initially configured with no " +
+                "groups");
+
+       if (((DiscoveryLocatorManagement)dgm).getLocators().length != 0)
+           throw throwNewConfigurationException("Entry for component " +
+                OutriggerServerImpl.COMPONENT_NAME + ", name " +
+               "discoveryManager must be initially configured with no " +
+               "locators");
+
+       // if this is the first incarnation, consult config for groups,
+       // locators and attributes.
+       if (initial) {
+           groups = (String[])
+               config.getEntry(OutriggerServerImpl.COMPONENT_NAME,
+                   "initialLookupGroups", String[].class,
+                   new String[]{""});
+
+           locators = (LookupLocator[])
+               Config.getNonNullEntry(config, 
+                   OutriggerServerImpl.COMPONENT_NAME,
+                    "initialLookupLocators", LookupLocator[].class, 
+                    new LookupLocator[0]);
+
+           final Entry[] cAttrs = (Entry[])
+               Config.getNonNullEntry(config, 
+                    OutriggerServerImpl.COMPONENT_NAME,
+                   "initialLookupAttributes", Entry[].class, new Entry[0]);
+
+           if (cAttrs.length == 0) {
+               attributes = baseAttributes;
+           } else {
+               attributes = 
+                   new Entry[cAttrs.length + baseAttributes.length];
+               System.arraycopy(baseAttributes, 0, attributes, 
+                                0, baseAttributes.length);
+               System.arraycopy(cAttrs, 0, attributes, 
+                                baseAttributes.length, cAttrs.length);
+           }
+       } else {
+           /* recovery : if there are any locators get and
+            * use recoveredLookupLocatorPreparer
+            */
+           if (locators.length > 0) {
+               final ProxyPreparer recoveredLookupLocatorPreparer = 
+                   (ProxyPreparer)Config.getNonNullEntry(config, 
+                       OutriggerServerImpl.COMPONENT_NAME,
+                       "recoveredLookupLocatorPreparer", ProxyPreparer.class,
+                        defaultPreparer);
+
+               final List prepared = new java.util.LinkedList();
+               for (int i=0; i<locators.length; i++) {
+                   final LookupLocator locator = locators[i];
+                   try {
+                       prepared.add(recoveredLookupLocatorPreparer.
+                                    prepareProxy(locator));
+                   } catch (Throwable t) {
+                       logger.log(Level.INFO,
+                           "Encountered exception preparing lookup locator " +
+                           "for " + locator + ", dropping locator", t);
+                   }
+               }
+
+               locators = 
+                   (LookupLocator[])prepared.toArray(new LookupLocator[0]);
+           }
+       }
+
+       // Now that we have groups & locators (either from 
+       // a previous incarnation or from the config) start discovery.
+       if (logger.isLoggable(Level.CONFIG)) {
+           if (groups == null) {
+               logger.log(Level.CONFIG, "joining all groups");
+           } else if (groups.length == 0) {
+               logger.log(Level.CONFIG, "joining no groups");  
+           } else {
+               final StringBuffer buf = new StringBuffer();
+               buf.append("joining groups:");
+               for (int i=0; i<groups.length; i++) {
+                   if (i != 0)
+                       buf.append(",");
+
+                   buf.append("\"");
+                   buf.append(groups[i]);
+                   buf.append("\"");
+               }
+               logger.log(Level.CONFIG, buf.toString());
+           }
+
+           if (locators.length == 0) {
+               logger.log(Level.CONFIG, "joining no specific registrars");
+           } else {
+               final StringBuffer buf = new StringBuffer();
+               buf.append("joining the specific registrars:");
+               for (int i=0; i<locators.length; i++) {
+                   if (i != 0)
+                       buf.append(", ");
+
+                   buf.append(locators[i]);
+               }
+               logger.log(Level.CONFIG, buf.toString());
+           }
+
+           if (attributes.length == 0) {
+               logger.log(Level.CONFIG, "registering no attributes");
+           } else {
+               final StringBuffer buf = new StringBuffer();
+               buf.append("registering the attributes:");
+               for (int i=0; i<attributes.length; i++) {
+                   if (i != 0)
+                       buf.append(", ");
+
+                   buf.append(attributes[i]);
+               }
+               logger.log(Level.CONFIG, buf.toString());               
+           }
+       }
+
+       dgm.setGroups(groups);
+       ((DiscoveryLocatorManagement)dgm).setLocators(locators);
+
+       mgr = new JoinManager(service, attributes, serviceID, 
+                             (DiscoveryManagement)dgm, null, config);
+
+       // Once we are running we don't need the attributes,
+       // locators, and groups fields, null them out (the
+       // state is in the mgr and dgm.
+       attributes = null;
+       groups = null;
+       locators = null;
+
+       // Now that we have state, make sure it is written to disk.
+       update();
+    }
+
+    /** 
+     * Make a good faith attempt to terminate
+     * discovery, and cancel any lookup registrations.  */
+    public void destroy() {
+       // Unregister with lookup
+
+       // Terminate the JoinManager first so it will not call
+       // into the dgm after it has been terminated.
+       if (mgr != null)
+           mgr.terminate();
+
+       if (dgm != null) 
+           ((DiscoveryManagement)dgm).terminate();
+    }
+
+    /* Basically we are implementing JoinAdmin, for get methods we just
+     * delegate to JoinManager, for the set methods we call
+     * JoinManager to and then persist the change by calling the
+     * appropriate method on our JoinAdminState.  If the call on our
+     * JoinAdminState throws an IOException we throw a runtime
+     * exception since JoinAdmin methods don't let us throw a
+     * IOException 
+     */
+
+    /** 
+     * Get the current attribute sets for the service. 
+     * 
+     * @return the current attribute sets for the service
+     */
+    public Entry[] getLookupAttributes() {
+       return mgr.getAttributes();
+    }
+
+    /** 
+     * Add attribute sets for the service.  The resulting set will be used
+     * for all future joins.  The attribute sets are also added to all 
+     * currently-joined lookup services.
+     *
+     * @param attrSets the attribute sets to add
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     */
+    public void addLookupAttributes(Entry[] attrSets) {
+       mgr.addAttributes(attrSets, true);
+       update();
+    }
+
+    /**  
+     * Modify the current attribute sets, using the same semantics as
+     * ServiceRegistration.modifyAttributes.  The resulting set will be used
+     * for all future joins.  The same modifications are also made to all 
+     * currently-joined lookup services.
+     *
+     * @param attrSetTemplates the templates for matching attribute sets
+     * @param attrSets the modifications to make to matching sets
+     *     
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see net.jini.core.lookup.ServiceRegistration#modifyAttributes
+     */
+    public void modifyLookupAttributes(Entry[] attrSetTemplates,
+                                      Entry[] attrSets) {
+       mgr.modifyAttributes(attrSetTemplates, attrSets, true);
+       update();
+    }
+
+   /**
+     * Get the list of groups to join.  An empty array means the service
+     * joins no groups (as opposed to "all" groups).
+     *
+     * @return an array of groups to join. An empty array means the service
+     *         joins no groups (as opposed to "all" groups).
+     * @see #setLookupGroups
+     */
+    public String[] getLookupGroups() {
+       return dgm.getGroups();
+    }
+
+    /**
+     * Add new groups to the set to join.  Lookup services in the new
+     * groups will be discovered and joined.
+     *
+     * @param groups groups to join
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #removeLookupGroups
+     */
+    public void addLookupGroups(String[] groups) {
+       try {
+           dgm.addGroups(groups);
+       } catch (IOException e) {
+           throw propagateIOException("Could not change groups", e);
+       }
+       update();
+    }
+
+    /**
+     * Remove groups from the set to join.  Leases are cancelled at lookup
+     * services that are not members of any of the remaining groups.
+     *
+     * @param groups groups to leave
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #addLookupGroups
+     */
+    public void removeLookupGroups(String[] groups) {
+       dgm.removeGroups(groups);
+       update();
+    }
+
+    /**
+     * Replace the list of groups to join with a new list.  Leases are
+     * cancelled at lookup services that are not members of any of the
+     * new groups.  Lookup services in the new groups will be discovered
+     * and joined.
+     *
+     * @param groups groups to join
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #getLookupGroups
+     */
+    public void setLookupGroups(String[] groups) {
+       try {
+           dgm.setGroups(groups);
+       } catch (IOException e) {
+           throw propagateIOException("Could not change groups", e);
+       }
+       update();
+    }
+    
+    /** 
+     * Get the list of locators of specific lookup services to join. 
+     *
+     * @return the list of locators of specific lookup services to join
+     * @see #setLookupLocators
+     */
+    public LookupLocator[] getLookupLocators() {
+       return ((DiscoveryLocatorManagement)dgm).getLocators();
+    }
+
+    /**
+     * Add locators for specific new lookup services to join.  The new
+     * lookup services will be discovered and joined.
+     *
+     * @param locators locators of specific lookup services to join
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #removeLookupLocators
+     */
+    public void addLookupLocators(LookupLocator[] locators) 
+        throws RemoteException
+    {
+       prepareLocators(locators);
+       ((DiscoveryLocatorManagement)dgm).addLocators(locators);
+       update();
+    }
+
+    /**
+     * Remove locators for specific lookup services from the set to join.
+     * Any leases held at the lookup services are cancelled.
+     *
+     * @param locators locators of specific lookup services to leave
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #addLookupLocators
+     */
+    public void removeLookupLocators(LookupLocator[] locators) 
+        throws RemoteException
+    {
+       prepareLocators(locators);
+       ((DiscoveryLocatorManagement)dgm).removeLocators(locators);
+       update();
+    }
+
+    /**
+     * Replace the list of locators of specific lookup services to join
+     * with a new list.  Leases are cancelled at lookup services that were
+     * in the old list but are not in the new list.  Any new lookup services
+     * will be discovered and joined.
+     *
+     * @param locators locators of specific lookup services to join
+     * @throws java.rmi.RuntimeException if the change can not be persisted.
+     * @see #getLookupLocators
+     */
+    public void setLookupLocators(LookupLocator[] locators) 
+        throws RemoteException
+    {
+       prepareLocators(locators);
+       ((DiscoveryLocatorManagement)dgm).setLocators(locators);
+       update();
+    }
+
+    private void update() {
+       if (log != null)
+           log.joinStateOp(this);
+    }
+
+    /**
+     * Apply <code>lookupLocatorPreparer</code> to each locator in the
+     * array, replacing the original locator with the result of the
+     * <code>prepareProxy</code> call. If call fails with an exception
+     * throw that exception.  
+     * @param locators the <code>LookupLocator</code>s to be prepared.
+     * @throws RemoteException if preparation of any of the locators
+     *         does.
+     * @throws SecurityException if preparation of any of the locators
+     *         does.
+     */
+    private void prepareLocators(LookupLocator[] locators) 
+        throws RemoteException 
+    {
+       for (int i = 0; i<locators.length; i++) 
+           locators[i] = (LookupLocator)lookupLocatorPreparer.prepareProxy(
+               locators[i]);
+    }
+
+    /**
+     * Utility method to write out an array of entities to an
+     * <code>ObjectOutputStream</code>.  Can be recovered by a call
+     * to <code>readAttributes()</code>
+     * <p>
+     * Packages each attribute in its own <code>MarshalledObject</code> so
+     * a bad codebase on an attribute class will not corrupt the whole array.
+     */
+    // @see JoinAdminActivationState#readAttributes
+    static private void writeAttributes(Entry[] attributes,
+                                        ObjectOutputStream out)
+        throws IOException
+    {
+        // Need to package each attribute in its own marshaled object,
+        // this makes sure that the attribute's code base is preserved
+        // and when we unpack to discard attributes who's codebase
+        // has been lost without throwing away those we can still deal with.
+         
+        out.writeInt(attributes.length);
+        for (int i=0; i<attributes.length; i++) {
+            out.writeObject(
+                new 
MarshalledInstance(attributes[i]).convertToMarshalledObject());
+       }
+    }
+ 
+    /**
+     * Utility method to read in an array of entities from a
+     * <code>ObjectInputStream</code>.  Array should have been written
+     * by a call to <code>writeAttributes()</code>
+     * <p>
+     *   
+     * Will try and recover as many attributes as possible.
+     * Attributes which can't be recovered won't be returned but they
+     * will remain in the log.
+     */
+    // @see JoinAdminActivationState#writeAttributes
+    static private Entry[] readAttributes(ObjectInputStream in)
+        throws IOException, ClassNotFoundException
+    {
+        final List entries = new java.util.LinkedList();
+        final int objectCount = in.readInt();
+        for (int i=0; i<objectCount; i++) {
+            try {
+                MarshalledObject mo = (MarshalledObject)in.readObject();
+                entries.add( new MarshalledInstance(mo).get(false));
+            } catch (IOException e) {
+               logger.log(Level.INFO, "Encountered IOException recovering " +
+                    "attribute, dropping attribute", e);
+            } catch (ClassNotFoundException e) {
+               logger.log(Level.INFO, "Encountered ClassNotFoundException " +
+                   "recovering attribute, dropping attribute", e);
+            }
+        }
+ 
+        return (Entry[])entries.toArray(new Entry[0]);
+    }
+
+    // -----------------------------------
+    //  Methods required by StorableObject
+    // -----------------------------------
+ 
+    // inherit doc comment
+    public void store(ObjectOutputStream out) throws IOException {
+        writeAttributes(mgr.getAttributes(), out);
+        out.writeObject(((DiscoveryLocatorManagement)dgm).getLocators());
+        out.writeObject(dgm.getGroups());
+    }
+
+    // inherit doc comment
+    public JoinStateManager restore(ObjectInputStream in) 
+       throws IOException, ClassNotFoundException 
+    {
+       initial = false;
+        attributes = readAttributes(in);
+        locators   = (LookupLocator [])in.readObject();
+        groups     = (String [])in.readObject();
+        return this;
+    }
+
+    /**
+     * Construct, log, and throw a new ConfigurationException with
+     * the given message.
+     */
+    private static ConfigurationException throwNewConfigurationException(
+            String msg) 
+       throws ConfigurationException
+    {
+       final ConfigurationException e = new ConfigurationException(msg);
+
+       if (logger.isLoggable(Levels.FAILED)) {
+           logger.log(Levels.FAILED, msg, e);
+       }
+
+       throw e;
+    }  
+
+    /** 
+     * Propagate an IOException by wrapping it in a RuntimeException.
+     * Performs appropriate logging.
+     */
+    private static RuntimeException propagateIOException(String msg,
+                                                        IOException nested)
+    {
+       final RuntimeException e = new RuntimeException(msg, nested);
+       if (logger.isLoggable(Levels.FAILED)) {
+           logger.log(Levels.FAILED, msg, e);
+       }
+
+       throw e;
+    }
+}

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java
 Sun Jul  5 11:41:39 2020
@@ -1,217 +1,221 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import net.jini.id.Uuid;
-
-/**
- * Methods that log an operation.  These are used
- * when writing to the store. <p>
- *
- * The <code>writeOp</code> and <code>takeOp</code> methods can
- * be called under a transaction. For these methods an in-progress
- * transaction is indicated by a non-null <code>txnId</code> parameter.
- * The <code>txnId</code> is an identifier for a transaction.
- * Each unique transaction must have a unique identifier and all
- * write or take operations under the same transaction should
- * use the same identifier. The store does not interpret the
- * identifier in any way. <p>
- *
- * <i>Note: Because the transaction identifier
- * must be unique, it can not be <code>ServerTransaction.id</code>.
- * Instead this identifier is the <code>Txn</code> ID. </i> <p>
- *
- * When a transaction is closed, the <code>prepareOp</code>,
- * <code>commitOp</code>, and <code>abortOp</code> methods are
- * passed the identifier (<code>txnId</code>) for that transaction. If
- * <code>prepareOp</code> is called and there is a restart, the
- * <code>txnId</code> passed to the write and take operations will
- * be passed back to the server via the <code>Recover.recoverWrite</code>
- * and <code>Recover.recoverTake</code> methods. Likewise the same
- * identifier will be passed to <code>Recover.recoverTransaction</code>.
- *
- * @see Store
- * @see Recover
- *
- * @author Sun Microsystems, Inc.
- *
- */
-public interface LogOps {
-
-    /**
-     * Log a server boot (first time start or any reactivation).
-     *
-     * @see Recover#recoverSessionId
-     *
-     * @param time stamp for this boot
-     *
-     * @param sessionId of this boot
-     */
-    void bootOp(long time, long sessionId);
-
-    /**
-     * Log an update to the join state
-     *
-     * @see Recover#recoverJoinState
-     *
-     * @param state to be logged
-     *
-     */
-    void joinStateOp(StorableObject state);
-
-    /**
-     * Log a <code>write</code> operation. If the operation was
-     * performed under a transaction the <code>txnId</code> is
-     * the identifier for that transaction.
-     *
-     * @see Recover#recoverWrite
-     *
-     * @param entry to be logged
-     *
-     * @param txnId transaction identifier or <code>null</code> if
-     *        no transaction is active for this write
-     */
-    void writeOp(StorableResource entry, Long txnId);
-
-    /**
-     * Log a batch <code>write</code> operation. If the operation was
-     * performed under a transaction the <code>txnId</code> is
-     * the identifier for that transaction.
-     *
-     * @see Recover#recoverWrite
-     * @param entries to be logged
-     * @param txnId transaction identifier or <code>null</code> if
-     *        no transaction is active for this write
-     */
-    void writeOp(StorableResource entries[], Long txnId);
-
-    /**
-     * Log a <code>take</code> operation. If the operation was
-     * performed under a transaction the <code>txnId</code> is
-     * the identifier for that transaction.
-     *
-     * @see Recover#recoverTake
-     *
-     * @param cookie ID identifying the entry target to be taken
-     *
-     * @param txnId transaction identifier or <code>null</code> if
-     *        no transaction is active for this take
-     */
-    void takeOp(Uuid cookie, Long txnId);
-
-    /**
-     * Log a batch <code>take</code> operation. If the operation was
-     * performed under a transaction the <code>txnId</code> is
-     * the identifier for that transaction.
-     *
-     * @see Recover#recoverTake
-     *
-     * @param cookies IDs identifying the entries to be taken
-     *
-     * @param txnId transaction identifier or <code>null</code> if
-     *        no transaction is active for this take
-     */
-    void takeOp(Uuid[] cookies, Long txnId);
-
-    /**
-     * Log a <code>notify</code> operation.  Notifications under
-     * transactions are lost at the end of the transaction, so the
-     * only ones that are logged are those that are under no
-     * transaction.
-     *
-     * @see Recover#recoverRegister
-     *
-     * @param registration to be logged
-     *
-     * @param type of registration, passed back via <code>type</code>
-     *             parameter of corresponding <code>recoverRegister</code>
-     *             call
-     *
-     * @param templates associated with this registration 
-     */
-    void registerOp(StorableResource registration, String type,
-                   StorableObject[] templates);
-
-    /**
-     * Log a <code>renew</code> operation.  We use the expiration, not
-     * the extension, because we don't want to calculate the
-     * expiration relative to when we read the log -- we want to use
-     * the exact expiration granted.
-     *
-     * @see StoredResource
-     *
-     * @param cookie ID of the entry or registration associated with this
-     *               renew
-     *
-     * @param expiration time 
-     */
-    void renewOp(Uuid cookie, long expiration);
-
-    /**
-     * Log a <code>cancel</code> and entry or registration. The entry or
-     * registration associated with <code>cookie</code> will no longer
-     * be recoverable and may be removed from the log records.
-     *
-     * @param cookie ID of the entry or registration to cancel
-     * @param expired is true if the cancel was due to a lease expiration
-     */
-    void cancelOp(Uuid cookie, boolean expired);
-
-    /**
-     * Log a transaction <code>prepare</code>. If there is a restart
-     * before either <code>commitOp</code> or <code>abortOp</code> is
-     * called for the transaction identified by <code>txnId</code>,
-     * all write and take operations associated with <code>txnId</code>
-     * will be recovered and <code>Recover.recoverTransaction</code>
-     * called with the the same <code>txnId</code>.
-     *
-     * @see Recover#recoverTransaction
-     *
-     * @param txnId identifier of the transaction to be prepared
-     *
-     * @param transaction object associated with this transaction
-     */
-    void prepareOp(Long txnId, StorableObject transaction);
-
-    /**
-     * Log a transaction <code>commit</code> or
-     * <code>prepareAndCommit</code>.  The store will commit the write
-     * and take operations associated with <code>txnId</code>. A call
-     * to <code>prepareOP</code> is not required for
-     * <code>commitOp</code> to be called.
-     *
-     * @param txnId identifier of the transaction to be committed 
-     */
-    void commitOp(Long txnId);
-
-    /**
-     * Log a transaction <code>abort</code>. Any write and take operations
-     * associated with <code>txnId</code> will no longer be recoverable
-     * and may be removed from the log records.
-     *
-     * @param txnId identifier of the transaction to be aborted
-     */
-    void abortOp(Long txnId);
-
-    /**
-     * Log the <code>Uuid</code> that identifies the space as a whole.
-     * @see Recover#recoverUuid
-     * @param uuid The <code>Uuid</code> to be stored.
-     */
-    void uuidOp(Uuid uuid);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import net.jini.id.Uuid;
+import org.apache.river.outrigger.proxy.StorableObject;
+import org.apache.river.outrigger.proxy.StorableResource;
+
+
+
+/**
+ * Methods that log an operation.  These are used
+ * when writing to the store. <p>
+ *
+ * The <code>writeOp</code> and <code>takeOp</code> methods can
+ * be called under a transaction. For these methods an in-progress
+ * transaction is indicated by a non-null <code>txnId</code> parameter.
+ * The <code>txnId</code> is an identifier for a transaction.
+ * Each unique transaction must have a unique identifier and all
+ * write or take operations under the same transaction should
+ * use the same identifier. The store does not interpret the
+ * identifier in any way. <p>
+ *
+ * <i>Note: Because the transaction identifier
+ * must be unique, it can not be <code>ServerTransaction.id</code>.
+ * Instead this identifier is the <code>Txn</code> ID. </i> <p>
+ *
+ * When a transaction is closed, the <code>prepareOp</code>,
+ * <code>commitOp</code>, and <code>abortOp</code> methods are
+ * passed the identifier (<code>txnId</code>) for that transaction. If
+ * <code>prepareOp</code> is called and there is a restart, the
+ * <code>txnId</code> passed to the write and take operations will
+ * be passed back to the server via the <code>Recover.recoverWrite</code>
+ * and <code>Recover.recoverTake</code> methods. Likewise the same
+ * identifier will be passed to <code>Recover.recoverTransaction</code>.
+ *
+ * @see Store
+ * @see Recover
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ */
+public interface LogOps {
+
+    /**
+     * Log a server boot (first time start or any reactivation).
+     *
+     * @see Recover#recoverSessionId
+     *
+     * @param time stamp for this boot
+     *
+     * @param sessionId of this boot
+     */
+    void bootOp(long time, long sessionId);
+
+    /**
+     * Log an update to the join state
+     *
+     * @see Recover#recoverJoinState
+     *
+     * @param state to be logged
+     *
+     */
+    void joinStateOp(StorableObject state);
+
+    /**
+     * Log a <code>write</code> operation. If the operation was
+     * performed under a transaction the <code>txnId</code> is
+     * the identifier for that transaction.
+     *
+     * @see Recover#recoverWrite
+     *
+     * @param entry to be logged
+     *
+     * @param txnId transaction identifier or <code>null</code> if
+     *        no transaction is active for this write
+     */
+    void writeOp(StorableResource entry, Long txnId);
+
+    /**
+     * Log a batch <code>write</code> operation. If the operation was
+     * performed under a transaction the <code>txnId</code> is
+     * the identifier for that transaction.
+     *
+     * @see Recover#recoverWrite
+     * @param entries to be logged
+     * @param txnId transaction identifier or <code>null</code> if
+     *        no transaction is active for this write
+     */
+    void writeOp(StorableResource entries[], Long txnId);
+
+    /**
+     * Log a <code>take</code> operation. If the operation was
+     * performed under a transaction the <code>txnId</code> is
+     * the identifier for that transaction.
+     *
+     * @see Recover#recoverTake
+     *
+     * @param cookie ID identifying the entry target to be taken
+     *
+     * @param txnId transaction identifier or <code>null</code> if
+     *        no transaction is active for this take
+     */
+    void takeOp(Uuid cookie, Long txnId);
+
+    /**
+     * Log a batch <code>take</code> operation. If the operation was
+     * performed under a transaction the <code>txnId</code> is
+     * the identifier for that transaction.
+     *
+     * @see Recover#recoverTake
+     *
+     * @param cookies IDs identifying the entries to be taken
+     *
+     * @param txnId transaction identifier or <code>null</code> if
+     *        no transaction is active for this take
+     */
+    void takeOp(Uuid[] cookies, Long txnId);
+
+    /**
+     * Log a <code>notify</code> operation.  Notifications under
+     * transactions are lost at the end of the transaction, so the
+     * only ones that are logged are those that are under no
+     * transaction.
+     *
+     * @see Recover#recoverRegister
+     *
+     * @param registration to be logged
+     *
+     * @param type of registration, passed back via <code>type</code>
+     *             parameter of corresponding <code>recoverRegister</code>
+     *             call
+     *
+     * @param templates associated with this registration 
+     */
+    void registerOp(StorableResource registration, String type,
+                   StorableObject[] templates);
+
+    /**
+     * Log a <code>renew</code> operation.  We use the expiration, not
+     * the extension, because we don't want to calculate the
+     * expiration relative to when we read the log -- we want to use
+     * the exact expiration granted.
+     *
+     * @see StoredResource
+     *
+     * @param cookie ID of the entry or registration associated with this
+     *               renew
+     *
+     * @param expiration time 
+     */
+    void renewOp(Uuid cookie, long expiration);
+
+    /**
+     * Log a <code>cancel</code> and entry or registration. The entry or
+     * registration associated with <code>cookie</code> will no longer
+     * be recoverable and may be removed from the log records.
+     *
+     * @param cookie ID of the entry or registration to cancel
+     * @param expired is true if the cancel was due to a lease expiration
+     */
+    void cancelOp(Uuid cookie, boolean expired);
+
+    /**
+     * Log a transaction <code>prepare</code>. If there is a restart
+     * before either <code>commitOp</code> or <code>abortOp</code> is
+     * called for the transaction identified by <code>txnId</code>,
+     * all write and take operations associated with <code>txnId</code>
+     * will be recovered and <code>Recover.recoverTransaction</code>
+     * called with the the same <code>txnId</code>.
+     *
+     * @see Recover#recoverTransaction
+     *
+     * @param txnId identifier of the transaction to be prepared
+     *
+     * @param transaction object associated with this transaction
+     */
+    void prepareOp(Long txnId, StorableObject transaction);
+
+    /**
+     * Log a transaction <code>commit</code> or
+     * <code>prepareAndCommit</code>.  The store will commit the write
+     * and take operations associated with <code>txnId</code>. A call
+     * to <code>prepareOP</code> is not required for
+     * <code>commitOp</code> to be called.
+     *
+     * @param txnId identifier of the transaction to be committed 
+     */
+    void commitOp(Long txnId);
+
+    /**
+     * Log a transaction <code>abort</code>. Any write and take operations
+     * associated with <code>txnId</code> will no longer be recoverable
+     * and may be removed from the log records.
+     *
+     * @param txnId identifier of the transaction to be aborted
+     */
+    void abortOp(Long txnId);
+
+    /**
+     * Log the <code>Uuid</code> that identifies the space as a whole.
+     * @see Recover#recoverUuid
+     * @param uuid The <code>Uuid</code> to be stored.
+     */
+    void uuidOp(Uuid uuid);
+}

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java
 Sun Jul  5 11:41:39 2020
@@ -1,270 +1,270 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.io.IOException;
-import java.rmi.RemoteException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import net.jini.core.event.UnknownEventException;
-import net.jini.config.Configuration;
-import net.jini.config.ConfigurationException;
-import net.jini.security.ProxyPreparer;
-import net.jini.space.JavaSpace;
-
-import org.apache.river.constants.ThrowableConstants;
-import org.apache.river.config.Config;
-import org.apache.river.logging.Levels;
-import org.apache.river.thread.RetryTask;
-import org.apache.river.thread.WakeupManager;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.river.thread.NamedThreadFactory;
-
-/**
- * The notifier thread.  This thread is responsible for notifying
- * objects for which interest has been registered.  It operates in
- * transient space as much as possible.  Pending notifications will be
- * lost when the server goes down, but registrations of interest
- * survive across server crashes for persistent servers.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see JavaSpace#notify
- * @see OutriggerServerImpl#notify
- */
-// @see NotifyChit
-class Notifier implements org.apache.river.constants.TimeConstants {
-    /** 
-     * The object to use for the <code>source</code> when creating
-     * events.
-     */
-    private final JavaSpace source;
-
-    /** Proxy preparer to use on recovered listeners */
-    private final ProxyPreparer recoveredListenerPreparer;
-    
-    /** wakeup manager for <code>NotifyTask</code> */
-    private final WakeupManager wakeupMgr = 
-       new WakeupManager(new WakeupManager.ThreadDesc(null, true));
-    
-    /** pending notifications tasks */
-    private final ExecutorService pending;
-
-    private final static int   MAX_ATTEMPTS = 10;      // max times to retry
-
-    /** Logger for logging event related information */
-    private static final Logger logger = 
-       Logger.getLogger(OutriggerServerImpl.eventLoggerName);
-
-    /**
-     * Create a notifier connected to the given <code>space</code>.
-     * @param source the value to use for the <code>source</code> in
-     *               remote event objects.
-     * @param recoveredListenerPreparer <code>ProxyPreparer</code> to
-     *               apply to recovered listeners.
-     * @param config a source of configuration data.a
-     * @throws ConfigurationException if there is a problem
-     *         with the passed configuration.
-     * @throws NullPointerException if <code>source</code> or 
-     *         <code>config</code> arguments are <code>null</code>.
-     */
-    Notifier(JavaSpace source, ProxyPreparer recoveredListenerPreparer, 
-            Configuration config) 
-       throws ConfigurationException 
-    {
-       if (source == null)
-           throw new NullPointerException("source must be non-null");
-       this.source = source;
-
-       this.recoveredListenerPreparer = recoveredListenerPreparer;
-
-       pending = Config.getNonNullEntry(config,
-           OutriggerServerImpl.COMPONENT_NAME, "notificationsExecutorService", 
-           ExecutorService.class, 
-            new ThreadPoolExecutor(
-                10,
-                10, /* Ignored */
-                15,
-                TimeUnit.SECONDS, 
-                new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
-                new NamedThreadFactory("OutriggerServerImpl Notifier", false)
-            )
-        );
-    }
-
-    /**
-     * Terminate the notifier, shutting down any threads
-     * it has running. This method can assume that
-     * the constructor completed.
-     */
-    void terminate() {
-       pending.shutdown();
-       wakeupMgr.stop();       
-       wakeupMgr.cancelAll();  
-    }
-
-    /**
-     * Queue up an event for delivery.
-     * @param sender An object that on request will
-     *               attempt to deliver its event
-     *               to the associated listener.
-     * @throws NullPointerException if <code>sender</code> is
-     * <code>null</code>
-     */
-    void enqueueDelivery(EventSender sender) {
-       pending.execute(new NotifyTask(sender));
-    }
-
-    /*
-     * Static stuff for Pending (can't put it in the class, unfortunately).
-     */
-                               // 1 day =hrs  mins secs milliseconds
-    private static final long  MAX_TIME = 1 * DAYS;
-    private static final long  delays[] = {
-                                   1 * SECONDS, 5 * SECONDS,
-                                   10 * SECONDS, 60 * SECONDS, 60 * SECONDS
-                               };
-
-    static {
-       /*
-        * Make the delays the amount of time since the start -- it
-        * is easier to declare the intervals, but the elapsed time is
-        * more <i>useful</i>.
-        */
-       for (int i = 1; i < delays.length; i++)
-           delays[i] += delays[i - 1];
-    }
-
-    /**
-     * A task that represent a notification of matching a particular
-     * template under a given transaction.
-     */
-    private class NotifyTask extends RetryTask {
-       /** Who and what to send a event to. */
-       private final EventSender sender;       
-
-       /**
-        * Create an object to represent this list of chits needing
-        * notification.
-        * @param sender An object that on request will
-        *               attempt to deliver its event
-        *               to the associated listener.
-        * @throws NullPointerException if <code>sender</code> is
-        * <code>null</code>
-        */
-       NotifyTask(EventSender sender) {
-           super(Notifier.this.pending, Notifier.this.wakeupMgr);
-           if (sender == null)
-               throw new NullPointerException("sender must be non-null");
-           this.sender = sender;
-       }
-
-       /**
-        * Try to notify the target.  Return <code>true</code> if the
-        * notification was successful.
-        * <p>
-        * We know that we are the only one dealing with the given chit
-        * because <code>runAfter</code> makes sure of it.
-        */
-       public boolean tryOnce() {
-           long curTime = System.currentTimeMillis();
-           if (curTime - startTime() > MAX_TIME) {
-               if (logger.isLoggable(Levels.FAILED)) {
-                   logger.log(Levels.FAILED, 
-                       "giving up on delivering event, keeping registration");
-               }
-
-               return true;    // just stop here, we are declaring "success"
-           }
-
-           boolean successful = true;  // notification successful?
-           try {
-               sender.sendEvent(source, curTime, recoveredListenerPreparer);
-           } catch (UnknownEventException e) {
-               // they didn't want to know about this, so stop them getting
-               // future notifications, too.
-               logFailure("UnknownEventException", Level.FINER, true, e);
-               sender.cancelRegistration();
-               // this is still "successful" -- we know to stop sending this
-           } catch (RemoteException e) {
-               final int cat = ThrowableConstants.retryable(e);
-
-               if (cat == ThrowableConstants.BAD_INVOCATION ||
-                   cat == ThrowableConstants.BAD_OBJECT)
-               {
-                   // Listener probably bad, retry likely to fail.
-                   logFailure("definite exception", Level.INFO, true, e);
-                   sender.cancelRegistration();
-               } else if (cat == ThrowableConstants.INDEFINITE) {
-                   // try, try, again
-                   logFailure("indefinite exception", Levels.FAILED,
-                              false, e);
-                   successful = false; 
-               } else if (cat == ThrowableConstants.UNCATEGORIZED) {
-                   // Same as above but log differently.
-                   logFailure("uncategorized exception", Level.INFO, false, 
-                              e);
-                   successful = false; 
-               } else {
-                   logger.log(Level.WARNING, "ThrowableConstants.retryable " +
-                       "returned out of range value, " + cat,
-                       new AssertionError(e));
-                   successful = false;                     
-               }
-           } catch (IOException e) {
-               // corrupted listener? unlikely to get better, cancel
-               logFailure("IOException", Level.INFO, true, e);
-               sender.cancelRegistration();
-           } catch (ClassNotFoundException e) {
-               // probably a codebase problem, retry
-               logFailure("ClassNotFoundException", Levels.FAILED, false, e);
-               successful = false;                     
-           } catch (RuntimeException e) {
-               /* bad listener, or preparer, either way unlikely to
-                * get better
-                */
-               logFailure("RuntimeException", Level.INFO, true, e);
-               sender.cancelRegistration();
-           }
-
-           if (!successful && attempt() > MAX_ATTEMPTS) {
-               if (logger.isLoggable(Levels.FAILED)) {
-                   logger.log(Levels.FAILED, 
-                       "giving up on delivering event, keeping registration");
-               }
-               return true;            // as successful as we're going to be
-           }
-
-           return successful;
-       }
-
-       /** Log a failed delivery attempt */
-       private void logFailure(String exceptionDescription, Level level,
-                               boolean terminal, Throwable t) 
-       {
-           if (logger.isLoggable(level)) {
-               logger.log(level, "Encountered " + exceptionDescription +
-                    "while preparing to send/sending event, " +
-                    (terminal?"dropping":"keeping") +  " registration", t);
-           }
-       }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.io.IOException;
+import java.rmi.RemoteException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.core.event.UnknownEventException;
+import net.jini.config.Configuration;
+import net.jini.config.ConfigurationException;
+import net.jini.security.ProxyPreparer;
+import net.jini.space.JavaSpace;
+
+import org.apache.river.constants.ThrowableConstants;
+import org.apache.river.config.Config;
+import org.apache.river.logging.Levels;
+import org.apache.river.thread.wakeup.RetryTask;
+import org.apache.river.thread.wakeup.WakeupManager;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.river.thread.NamedThreadFactory;
+
+/**
+ * The notifier thread.  This thread is responsible for notifying
+ * objects for which interest has been registered.  It operates in
+ * transient space as much as possible.  Pending notifications will be
+ * lost when the server goes down, but registrations of interest
+ * survive across server crashes for persistent servers.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see JavaSpace#notify
+ * @see OutriggerServerImpl#notify
+ */
+// @see NotifyChit
+class Notifier implements org.apache.river.constants.TimeConstants {
+    /** 
+     * The object to use for the <code>source</code> when creating
+     * events.
+     */
+    private final JavaSpace source;
+
+    /** Proxy preparer to use on recovered listeners */
+    private final ProxyPreparer recoveredListenerPreparer;
+    
+    /** wakeup manager for <code>NotifyTask</code> */
+    private final WakeupManager wakeupMgr = 
+       new WakeupManager(new WakeupManager.ThreadDesc(null, true));
+    
+    /** pending notifications tasks */
+    private final ExecutorService pending;
+
+    private final static int   MAX_ATTEMPTS = 10;      // max times to retry
+
+    /** Logger for logging event related information */
+    private static final Logger logger = 
+       Logger.getLogger(OutriggerServerImpl.eventLoggerName);
+
+    /**
+     * Create a notifier connected to the given <code>space</code>.
+     * @param source the value to use for the <code>source</code> in
+     *               remote event objects.
+     * @param recoveredListenerPreparer <code>ProxyPreparer</code> to
+     *               apply to recovered listeners.
+     * @param config a source of configuration data.a
+     * @throws ConfigurationException if there is a problem
+     *         with the passed configuration.
+     * @throws NullPointerException if <code>source</code> or 
+     *         <code>config</code> arguments are <code>null</code>.
+     */
+    Notifier(JavaSpace source, ProxyPreparer recoveredListenerPreparer, 
+            Configuration config) 
+       throws ConfigurationException 
+    {
+       if (source == null)
+           throw new NullPointerException("source must be non-null");
+       this.source = source;
+
+       this.recoveredListenerPreparer = recoveredListenerPreparer;
+
+       pending = Config.getNonNullEntry(config,
+           OutriggerServerImpl.COMPONENT_NAME, "notificationsExecutorService", 
+           ExecutorService.class, 
+            new ThreadPoolExecutor(
+                10,
+                10, /* Ignored */
+                15,
+                TimeUnit.SECONDS, 
+                new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
+                new NamedThreadFactory("OutriggerServerImpl Notifier", false)
+            )
+        );
+    }
+
+    /**
+     * Terminate the notifier, shutting down any threads
+     * it has running. This method can assume that
+     * the constructor completed.
+     */
+    void terminate() {
+       pending.shutdown();
+       wakeupMgr.stop();       
+       wakeupMgr.cancelAll();  
+    }
+
+    /**
+     * Queue up an event for delivery.
+     * @param sender An object that on request will
+     *               attempt to deliver its event
+     *               to the associated listener.
+     * @throws NullPointerException if <code>sender</code> is
+     * <code>null</code>
+     */
+    void enqueueDelivery(EventSender sender) {
+       pending.execute(new NotifyTask(sender));
+    }
+
+    /*
+     * Static stuff for Pending (can't put it in the class, unfortunately).
+     */
+                               // 1 day =hrs  mins secs milliseconds
+    private static final long  MAX_TIME = 1 * DAYS;
+    private static final long  delays[] = {
+                                   1 * SECONDS, 5 * SECONDS,
+                                   10 * SECONDS, 60 * SECONDS, 60 * SECONDS
+                               };
+
+    static {
+       /*
+        * Make the delays the amount of time since the start -- it
+        * is easier to declare the intervals, but the elapsed time is
+        * more <i>useful</i>.
+        */
+       for (int i = 1; i < delays.length; i++)
+           delays[i] += delays[i - 1];
+    }
+
+    /**
+     * A task that represent a notification of matching a particular
+     * template under a given transaction.
+     */
+    private class NotifyTask extends RetryTask {
+       /** Who and what to send a event to. */
+       private final EventSender sender;       
+
+       /**
+        * Create an object to represent this list of chits needing
+        * notification.
+        * @param sender An object that on request will
+        *               attempt to deliver its event
+        *               to the associated listener.
+        * @throws NullPointerException if <code>sender</code> is
+        * <code>null</code>
+        */
+       NotifyTask(EventSender sender) {
+           super(Notifier.this.pending, Notifier.this.wakeupMgr);
+           if (sender == null)
+               throw new NullPointerException("sender must be non-null");
+           this.sender = sender;
+       }
+
+       /**
+        * Try to notify the target.  Return <code>true</code> if the
+        * notification was successful.
+        * <p>
+        * We know that we are the only one dealing with the given chit
+        * because <code>runAfter</code> makes sure of it.
+        */
+       public boolean tryOnce() {
+           long curTime = System.currentTimeMillis();
+           if (curTime - startTime() > MAX_TIME) {
+               if (logger.isLoggable(Levels.FAILED)) {
+                   logger.log(Levels.FAILED, 
+                       "giving up on delivering event, keeping registration");
+               }
+
+               return true;    // just stop here, we are declaring "success"
+           }
+
+           boolean successful = true;  // notification successful?
+           try {
+               sender.sendEvent(source, curTime, recoveredListenerPreparer);
+           } catch (UnknownEventException e) {
+               // they didn't want to know about this, so stop them getting
+               // future notifications, too.
+               logFailure("UnknownEventException", Level.FINER, true, e);
+               sender.cancelRegistration();
+               // this is still "successful" -- we know to stop sending this
+           } catch (RemoteException e) {
+               final int cat = ThrowableConstants.retryable(e);
+
+               if (cat == ThrowableConstants.BAD_INVOCATION ||
+                   cat == ThrowableConstants.BAD_OBJECT)
+               {
+                   // Listener probably bad, retry likely to fail.
+                   logFailure("definite exception", Level.INFO, true, e);
+                   sender.cancelRegistration();
+               } else if (cat == ThrowableConstants.INDEFINITE) {
+                   // try, try, again
+                   logFailure("indefinite exception", Levels.FAILED,
+                              false, e);
+                   successful = false; 
+               } else if (cat == ThrowableConstants.UNCATEGORIZED) {
+                   // Same as above but log differently.
+                   logFailure("uncategorized exception", Level.INFO, false, 
+                              e);
+                   successful = false; 
+               } else {
+                   logger.log(Level.WARNING, "ThrowableConstants.retryable " +
+                       "returned out of range value, " + cat,
+                       new AssertionError(e));
+                   successful = false;                     
+               }
+           } catch (IOException e) {
+               // corrupted listener? unlikely to get better, cancel
+               logFailure("IOException", Level.INFO, true, e);
+               sender.cancelRegistration();
+           } catch (ClassNotFoundException e) {
+               // probably a codebase problem, retry
+               logFailure("ClassNotFoundException", Levels.FAILED, false, e);
+               successful = false;                     
+           } catch (RuntimeException e) {
+               /* bad listener, or preparer, either way unlikely to
+                * get better
+                */
+               logFailure("RuntimeException", Level.INFO, true, e);
+               sender.cancelRegistration();
+           }
+
+           if (!successful && attempt() > MAX_ATTEMPTS) {
+               if (logger.isLoggable(Levels.FAILED)) {
+                   logger.log(Levels.FAILED, 
+                       "giving up on delivering event, keeping registration");
+               }
+               return true;            // as successful as we're going to be
+           }
+
+           return successful;
+       }
+
+       /** Log a failed delivery attempt */
+       private void logFailure(String exceptionDescription, Level level,
+                               boolean terminal, Throwable t) 
+       {
+           if (logger.isLoggable(level)) {
+               logger.log(level, "Encountered " + exceptionDescription +
+                    "while preparing to send/sending event, " +
+                    (terminal?"dropping":"keeping") +  " registration", t);
+           }
+       }
+    }
+}

Modified: 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java
 Sun Jul  5 11:41:39 2020
@@ -30,13 +30,16 @@ import org.apache.river.landlord.FixedLe
 import org.apache.river.landlord.LocalLandlord;
 import org.apache.river.landlord.LeaseFactory;
 import org.apache.river.logging.Levels;
-import org.apache.river.start.LifeCycle;
+import org.apache.river.start.lifecycle.LifeCycle;
 import org.apache.river.api.util.Startable;
+import org.apache.river.outrigger.proxy.StorableObject;
+import org.apache.river.outrigger.proxy.StorableResource;
 
 import net.jini.id.Uuid;
 import net.jini.id.UuidFactory;
 import net.jini.activation.ActivationGroup;
 
+
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationProvider;
 import net.jini.config.ConfigurationException;
@@ -72,6 +75,20 @@ import net.jini.lookup.entry.ServiceInfo
 import net.jini.space.InternalSpaceException;
 import net.jini.space.JavaSpace;
 
+import org.apache.river.outrigger.proxy.AdminProxy;
+import org.apache.river.outrigger.proxy.OutriggerServer;
+import org.apache.river.outrigger.proxy.ConstrainableAdminProxy;
+import org.apache.river.outrigger.proxy.ConstrainableParticipantProxy;
+import org.apache.river.outrigger.proxy.ConstrainableSpaceProxy2;
+import org.apache.river.outrigger.proxy.SpaceProxy2;
+import org.apache.river.outrigger.proxy.ParticipantProxy;
+import org.apache.river.outrigger.proxy.EntryRep;
+import org.apache.river.admin.JavaSpaceAdmin;
+import org.apache.river.outrigger.proxy.MatchSetData;
+import org.apache.river.outrigger.proxy.OutriggerQueryCookie;
+import org.apache.river.outrigger.proxy.ProxyVerifier;
+
+
 import java.io.IOException;
 import java.rmi.MarshalledObject;
 import java.rmi.NoSuchObjectException;


Reply via email to