Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/JoinStateManager.java Sun Jul 5 11:41:39 2020 @@ -1,639 +1,641 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.rmi.MarshalledObject; -import java.rmi.RemoteException; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.jini.core.discovery.LookupLocator; -import net.jini.core.entry.Entry; -import net.jini.core.lookup.ServiceID; -import net.jini.discovery.DiscoveryManagement; -import net.jini.discovery.DiscoveryLocatorManagement; -import net.jini.discovery.DiscoveryGroupManagement; -import net.jini.discovery.LookupDiscoveryManager; -import net.jini.lookup.JoinManager; - -import net.jini.config.Configuration; -import net.jini.config.ConfigurationException; -import net.jini.security.ProxyPreparer; - -import org.apache.river.config.Config; -import org.apache.river.logging.Levels; -import net.jini.io.MarshalledInstance; - -/** - * <code>JoinStateManager</code> provides a utility that manages - * a service's join state (optionally persisting that state) and - * manages the join protocol protocol on behalf of the service. - * - * @author Sun Microsystems, Inc. - * - * @see JoinManager - */ -// @see JoinAdminState -class JoinStateManager implements StorableObject<JoinStateManager> { - /** <code>ProxyPreparer</code> for <code>LookupLocators</code> */ - private volatile ProxyPreparer lookupLocatorPreparer; - - /** - * Object used to find lookups. Has to implement DiscoveryManagement - * and DiscoveryLocatorManagement as well as DiscoveryGroupManagement. - */ - private volatile DiscoveryGroupManagement dgm; - - /** - * <code>JoinManager</code> that is handling the details of binding - * into Jini lookup services. - */ - private volatile JoinManager mgr; - - /** - * The object that is coordinating our persistent state. - */ - private volatile LogOps log; - - /** - * The list of attributes to start with. This field is only used - * to carry data from the <code>restore</code> method to the - * <code>startManager</code> method. The current set of attributes - * is kept by <code>mgr</code>. This field is nulled out by - * <code>startManager</code>. - */ - private volatile Entry[] attributes; - - /** - * The list of <code>LookupLocator</code>s to start with. This - * field is only used to carry data from the <code>restore</code> - * method to the <code>startManager</code> method. The current set - * of attributes is kept by <code>mgr</code>. This field is nulled - * out by <code>startManager</code>. - */ - private volatile LookupLocator locators[]; - - - /** - * The list of group names to start with. This field is only used - * to carry data from the <code>restore</code> method to the - * <code>startManager</code> method. The current set of attributes - * is kept by <code>mgr</code>. This field is nulled out by - * <code>startManager</code>. - */ - private volatile String groups[]; - - /** - * Conceptually, true if this is the first time this - * service has come up, implemented as if there was - * no previous state then this is the first time. - */ - private volatile boolean initial = true; - - /** Logger for logging join related information */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.joinLoggerName); - - /** - * Simple constructor. - */ - JoinStateManager() { } - - /** - * Start the manager. Start looking for lookup and registering - * with them. - * @param config object to use to obtain - * <code>DiscoveryManagement</code> object, and if - * this is the initial incarnation of this service, - * the object used to get the initial set of groups, - * locators, and deployer defined attributes. - * @param log object used to persist the manager's state, may be - * <code>null</code>. - * @param serviceID The <code>ServiceID</code> to register - * under. - * @param service The proxy object to register with lookups. - * @param baseAttributes Any attributes the implementation wants - * attached, only used if this is the - * initial incarnation. - * @throws IOException if the is problem persisting the - * initial state or in starting discovery. - * @throws ConfigurationException if the configuration - * is invalid. - * @throws NullPointerException if <code>config</code>, - * or <code>serviceID</code> is <code>null</code>. - */ - void startManager(Configuration config, LogOps log, Object service, - ServiceID serviceID, Entry[] baseAttributes) - throws IOException, ConfigurationException - { - // Default do nothing preparer - final ProxyPreparer defaultPreparer = - new net.jini.security.BasicProxyPreparer(); - - if (serviceID == null) - throw new NullPointerException("serviceID can't be null"); - - this.log = log; - - lookupLocatorPreparer = - (ProxyPreparer)Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, "lookupLocatorPreparer", - ProxyPreparer.class, defaultPreparer); - - dgm = (DiscoveryGroupManagement) - Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, "discoveryManager", - DiscoveryGroupManagement.class, - new LookupDiscoveryManager( - DiscoveryGroupManagement.NO_GROUPS, null, null, - config)); - - if (!(dgm instanceof DiscoveryManagement)) - throw throwNewConfigurationException("Entry for component " + - OutriggerServerImpl.COMPONENT_NAME + ", name " + - "discoveryManager must implement " + - "net.jini.discovery.DiscoveryGroupManagement"); - - if (!(dgm instanceof DiscoveryLocatorManagement)) - throw throwNewConfigurationException("Entry for component " + - OutriggerServerImpl.COMPONENT_NAME + ", name " + - "discoveryManager must implement " + - "net.jini.discovery.DiscoveryLocatorManagement"); - - final String[] toCheck = dgm.getGroups(); - if (toCheck == null || toCheck.length != 0) - throw throwNewConfigurationException("Entry for component " + - OutriggerServerImpl.COMPONENT_NAME + ", name " + - "discoveryManager must be initially configured with no " + - "groups"); - - if (((DiscoveryLocatorManagement)dgm).getLocators().length != 0) - throw throwNewConfigurationException("Entry for component " + - OutriggerServerImpl.COMPONENT_NAME + ", name " + - "discoveryManager must be initially configured with no " + - "locators"); - - // if this is the first incarnation, consult config for groups, - // locators and attributes. - if (initial) { - groups = (String[]) - config.getEntry(OutriggerServerImpl.COMPONENT_NAME, - "initialLookupGroups", String[].class, - new String[]{""}); - - locators = (LookupLocator[]) - Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, - "initialLookupLocators", LookupLocator[].class, - new LookupLocator[0]); - - final Entry[] cAttrs = (Entry[]) - Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, - "initialLookupAttributes", Entry[].class, new Entry[0]); - - if (cAttrs.length == 0) { - attributes = baseAttributes; - } else { - attributes = - new Entry[cAttrs.length + baseAttributes.length]; - System.arraycopy(baseAttributes, 0, attributes, - 0, baseAttributes.length); - System.arraycopy(cAttrs, 0, attributes, - baseAttributes.length, cAttrs.length); - } - } else { - /* recovery : if there are any locators get and - * use recoveredLookupLocatorPreparer - */ - if (locators.length > 0) { - final ProxyPreparer recoveredLookupLocatorPreparer = - (ProxyPreparer)Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, - "recoveredLookupLocatorPreparer", ProxyPreparer.class, - defaultPreparer); - - final List prepared = new java.util.LinkedList(); - for (int i=0; i<locators.length; i++) { - final LookupLocator locator = locators[i]; - try { - prepared.add(recoveredLookupLocatorPreparer. - prepareProxy(locator)); - } catch (Throwable t) { - logger.log(Level.INFO, - "Encountered exception preparing lookup locator " + - "for " + locator + ", dropping locator", t); - } - } - - locators = - (LookupLocator[])prepared.toArray(new LookupLocator[0]); - } - } - - // Now that we have groups & locators (either from - // a previous incarnation or from the config) start discovery. - if (logger.isLoggable(Level.CONFIG)) { - if (groups == null) { - logger.log(Level.CONFIG, "joining all groups"); - } else if (groups.length == 0) { - logger.log(Level.CONFIG, "joining no groups"); - } else { - final StringBuffer buf = new StringBuffer(); - buf.append("joining groups:"); - for (int i=0; i<groups.length; i++) { - if (i != 0) - buf.append(","); - - buf.append("\""); - buf.append(groups[i]); - buf.append("\""); - } - logger.log(Level.CONFIG, buf.toString()); - } - - if (locators.length == 0) { - logger.log(Level.CONFIG, "joining no specific registrars"); - } else { - final StringBuffer buf = new StringBuffer(); - buf.append("joining the specific registrars:"); - for (int i=0; i<locators.length; i++) { - if (i != 0) - buf.append(", "); - - buf.append(locators[i]); - } - logger.log(Level.CONFIG, buf.toString()); - } - - if (attributes.length == 0) { - logger.log(Level.CONFIG, "registering no attributes"); - } else { - final StringBuffer buf = new StringBuffer(); - buf.append("registering the attributes:"); - for (int i=0; i<attributes.length; i++) { - if (i != 0) - buf.append(", "); - - buf.append(attributes[i]); - } - logger.log(Level.CONFIG, buf.toString()); - } - } - - dgm.setGroups(groups); - ((DiscoveryLocatorManagement)dgm).setLocators(locators); - - mgr = new JoinManager(service, attributes, serviceID, - (DiscoveryManagement)dgm, null, config); - - // Once we are running we don't need the attributes, - // locators, and groups fields, null them out (the - // state is in the mgr and dgm. - attributes = null; - groups = null; - locators = null; - - // Now that we have state, make sure it is written to disk. - update(); - } - - /** - * Make a good faith attempt to terminate - * discovery, and cancel any lookup registrations. */ - public void destroy() { - // Unregister with lookup - - // Terminate the JoinManager first so it will not call - // into the dgm after it has been terminated. - if (mgr != null) - mgr.terminate(); - - if (dgm != null) - ((DiscoveryManagement)dgm).terminate(); - } - - /* Basically we are implementing JoinAdmin, for get methods we just - * delegate to JoinManager, for the set methods we call - * JoinManager to and then persist the change by calling the - * appropriate method on our JoinAdminState. If the call on our - * JoinAdminState throws an IOException we throw a runtime - * exception since JoinAdmin methods don't let us throw a - * IOException - */ - - /** - * Get the current attribute sets for the service. - * - * @return the current attribute sets for the service - */ - public Entry[] getLookupAttributes() { - return mgr.getAttributes(); - } - - /** - * Add attribute sets for the service. The resulting set will be used - * for all future joins. The attribute sets are also added to all - * currently-joined lookup services. - * - * @param attrSets the attribute sets to add - * @throws java.rmi.RuntimeException if the change can not be persisted. - */ - public void addLookupAttributes(Entry[] attrSets) { - mgr.addAttributes(attrSets, true); - update(); - } - - /** - * Modify the current attribute sets, using the same semantics as - * ServiceRegistration.modifyAttributes. The resulting set will be used - * for all future joins. The same modifications are also made to all - * currently-joined lookup services. - * - * @param attrSetTemplates the templates for matching attribute sets - * @param attrSets the modifications to make to matching sets - * - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see net.jini.core.lookup.ServiceRegistration#modifyAttributes - */ - public void modifyLookupAttributes(Entry[] attrSetTemplates, - Entry[] attrSets) { - mgr.modifyAttributes(attrSetTemplates, attrSets, true); - update(); - } - - /** - * Get the list of groups to join. An empty array means the service - * joins no groups (as opposed to "all" groups). - * - * @return an array of groups to join. An empty array means the service - * joins no groups (as opposed to "all" groups). - * @see #setLookupGroups - */ - public String[] getLookupGroups() { - return dgm.getGroups(); - } - - /** - * Add new groups to the set to join. Lookup services in the new - * groups will be discovered and joined. - * - * @param groups groups to join - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #removeLookupGroups - */ - public void addLookupGroups(String[] groups) { - try { - dgm.addGroups(groups); - } catch (IOException e) { - throw propagateIOException("Could not change groups", e); - } - update(); - } - - /** - * Remove groups from the set to join. Leases are cancelled at lookup - * services that are not members of any of the remaining groups. - * - * @param groups groups to leave - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #addLookupGroups - */ - public void removeLookupGroups(String[] groups) { - dgm.removeGroups(groups); - update(); - } - - /** - * Replace the list of groups to join with a new list. Leases are - * cancelled at lookup services that are not members of any of the - * new groups. Lookup services in the new groups will be discovered - * and joined. - * - * @param groups groups to join - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #getLookupGroups - */ - public void setLookupGroups(String[] groups) { - try { - dgm.setGroups(groups); - } catch (IOException e) { - throw propagateIOException("Could not change groups", e); - } - update(); - } - - /** - * Get the list of locators of specific lookup services to join. - * - * @return the list of locators of specific lookup services to join - * @see #setLookupLocators - */ - public LookupLocator[] getLookupLocators() { - return ((DiscoveryLocatorManagement)dgm).getLocators(); - } - - /** - * Add locators for specific new lookup services to join. The new - * lookup services will be discovered and joined. - * - * @param locators locators of specific lookup services to join - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #removeLookupLocators - */ - public void addLookupLocators(LookupLocator[] locators) - throws RemoteException - { - prepareLocators(locators); - ((DiscoveryLocatorManagement)dgm).addLocators(locators); - update(); - } - - /** - * Remove locators for specific lookup services from the set to join. - * Any leases held at the lookup services are cancelled. - * - * @param locators locators of specific lookup services to leave - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #addLookupLocators - */ - public void removeLookupLocators(LookupLocator[] locators) - throws RemoteException - { - prepareLocators(locators); - ((DiscoveryLocatorManagement)dgm).removeLocators(locators); - update(); - } - - /** - * Replace the list of locators of specific lookup services to join - * with a new list. Leases are cancelled at lookup services that were - * in the old list but are not in the new list. Any new lookup services - * will be discovered and joined. - * - * @param locators locators of specific lookup services to join - * @throws java.rmi.RuntimeException if the change can not be persisted. - * @see #getLookupLocators - */ - public void setLookupLocators(LookupLocator[] locators) - throws RemoteException - { - prepareLocators(locators); - ((DiscoveryLocatorManagement)dgm).setLocators(locators); - update(); - } - - private void update() { - if (log != null) - log.joinStateOp(this); - } - - /** - * Apply <code>lookupLocatorPreparer</code> to each locator in the - * array, replacing the original locator with the result of the - * <code>prepareProxy</code> call. If call fails with an exception - * throw that exception. - * @param locators the <code>LookupLocator</code>s to be prepared. - * @throws RemoteException if preparation of any of the locators - * does. - * @throws SecurityException if preparation of any of the locators - * does. - */ - private void prepareLocators(LookupLocator[] locators) - throws RemoteException - { - for (int i = 0; i<locators.length; i++) - locators[i] = (LookupLocator)lookupLocatorPreparer.prepareProxy( - locators[i]); - } - - /** - * Utility method to write out an array of entities to an - * <code>ObjectOutputStream</code>. Can be recovered by a call - * to <code>readAttributes()</code> - * <p> - * Packages each attribute in its own <code>MarshalledObject</code> so - * a bad codebase on an attribute class will not corrupt the whole array. - */ - // @see JoinAdminActivationState#readAttributes - static private void writeAttributes(Entry[] attributes, - ObjectOutputStream out) - throws IOException - { - // Need to package each attribute in its own marshaled object, - // this makes sure that the attribute's code base is preserved - // and when we unpack to discard attributes who's codebase - // has been lost without throwing away those we can still deal with. - - out.writeInt(attributes.length); - for (int i=0; i<attributes.length; i++) { - out.writeObject( - new MarshalledInstance(attributes[i]).convertToMarshalledObject()); - } - } - - /** - * Utility method to read in an array of entities from a - * <code>ObjectInputStream</code>. Array should have been written - * by a call to <code>writeAttributes()</code> - * <p> - * - * Will try and recover as many attributes as possible. - * Attributes which can't be recovered won't be returned but they - * will remain in the log. - */ - // @see JoinAdminActivationState#writeAttributes - static private Entry[] readAttributes(ObjectInputStream in) - throws IOException, ClassNotFoundException - { - final List entries = new java.util.LinkedList(); - final int objectCount = in.readInt(); - for (int i=0; i<objectCount; i++) { - try { - MarshalledObject mo = (MarshalledObject)in.readObject(); - entries.add( new MarshalledInstance(mo).get(false)); - } catch (IOException e) { - logger.log(Level.INFO, "Encountered IOException recovering " + - "attribute, dropping attribute", e); - } catch (ClassNotFoundException e) { - logger.log(Level.INFO, "Encountered ClassNotFoundException " + - "recovering attribute, dropping attribute", e); - } - } - - return (Entry[])entries.toArray(new Entry[0]); - } - - // ----------------------------------- - // Methods required by StorableObject - // ----------------------------------- - - // inherit doc comment - public void store(ObjectOutputStream out) throws IOException { - writeAttributes(mgr.getAttributes(), out); - out.writeObject(((DiscoveryLocatorManagement)dgm).getLocators()); - out.writeObject(dgm.getGroups()); - } - - // inherit doc comment - public JoinStateManager restore(ObjectInputStream in) - throws IOException, ClassNotFoundException - { - initial = false; - attributes = readAttributes(in); - locators = (LookupLocator [])in.readObject(); - groups = (String [])in.readObject(); - return this; - } - - /** - * Construct, log, and throw a new ConfigurationException with - * the given message. - */ - private static ConfigurationException throwNewConfigurationException( - String msg) - throws ConfigurationException - { - final ConfigurationException e = new ConfigurationException(msg); - - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, msg, e); - } - - throw e; - } - - /** - * Propagate an IOException by wrapping it in a RuntimeException. - * Performs appropriate logging. - */ - private static RuntimeException propagateIOException(String msg, - IOException nested) - { - final RuntimeException e = new RuntimeException(msg, nested); - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, msg, e); - } - - throw e; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.core.lookup.ServiceID; +import net.jini.discovery.DiscoveryManagement; +import net.jini.discovery.DiscoveryLocatorManagement; +import net.jini.discovery.DiscoveryGroupManagement; +import net.jini.discovery.LookupDiscoveryManager; +import net.jini.lookup.JoinManager; +import org.apache.river.outrigger.proxy.StorableObject; + + +import net.jini.config.Configuration; +import net.jini.config.ConfigurationException; +import net.jini.security.ProxyPreparer; + +import org.apache.river.config.Config; +import org.apache.river.logging.Levels; +import net.jini.io.MarshalledInstance; + +/** + * <code>JoinStateManager</code> provides a utility that manages + * a service's join state (optionally persisting that state) and + * manages the join protocol protocol on behalf of the service. + * + * @author Sun Microsystems, Inc. + * + * @see JoinManager + */ +// @see JoinAdminState +class JoinStateManager implements StorableObject<JoinStateManager> { + /** <code>ProxyPreparer</code> for <code>LookupLocators</code> */ + private volatile ProxyPreparer lookupLocatorPreparer; + + /** + * Object used to find lookups. Has to implement DiscoveryManagement + * and DiscoveryLocatorManagement as well as DiscoveryGroupManagement. + */ + private volatile DiscoveryGroupManagement dgm; + + /** + * <code>JoinManager</code> that is handling the details of binding + * into Jini lookup services. + */ + private volatile JoinManager mgr; + + /** + * The object that is coordinating our persistent state. + */ + private volatile LogOps log; + + /** + * The list of attributes to start with. This field is only used + * to carry data from the <code>restore</code> method to the + * <code>startManager</code> method. The current set of attributes + * is kept by <code>mgr</code>. This field is nulled out by + * <code>startManager</code>. + */ + private volatile Entry[] attributes; + + /** + * The list of <code>LookupLocator</code>s to start with. This + * field is only used to carry data from the <code>restore</code> + * method to the <code>startManager</code> method. The current set + * of attributes is kept by <code>mgr</code>. This field is nulled + * out by <code>startManager</code>. + */ + private volatile LookupLocator locators[]; + + + /** + * The list of group names to start with. This field is only used + * to carry data from the <code>restore</code> method to the + * <code>startManager</code> method. The current set of attributes + * is kept by <code>mgr</code>. This field is nulled out by + * <code>startManager</code>. + */ + private volatile String groups[]; + + /** + * Conceptually, true if this is the first time this + * service has come up, implemented as if there was + * no previous state then this is the first time. + */ + private volatile boolean initial = true; + + /** Logger for logging join related information */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.joinLoggerName); + + /** + * Simple constructor. + */ + JoinStateManager() { } + + /** + * Start the manager. Start looking for lookup and registering + * with them. + * @param config object to use to obtain + * <code>DiscoveryManagement</code> object, and if + * this is the initial incarnation of this service, + * the object used to get the initial set of groups, + * locators, and deployer defined attributes. + * @param log object used to persist the manager's state, may be + * <code>null</code>. + * @param serviceID The <code>ServiceID</code> to register + * under. + * @param service The proxy object to register with lookups. + * @param baseAttributes Any attributes the implementation wants + * attached, only used if this is the + * initial incarnation. + * @throws IOException if the is problem persisting the + * initial state or in starting discovery. + * @throws ConfigurationException if the configuration + * is invalid. + * @throws NullPointerException if <code>config</code>, + * or <code>serviceID</code> is <code>null</code>. + */ + void startManager(Configuration config, LogOps log, Object service, + ServiceID serviceID, Entry[] baseAttributes) + throws IOException, ConfigurationException + { + // Default do nothing preparer + final ProxyPreparer defaultPreparer = + new net.jini.security.BasicProxyPreparer(); + + if (serviceID == null) + throw new NullPointerException("serviceID can't be null"); + + this.log = log; + + lookupLocatorPreparer = + (ProxyPreparer)Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, "lookupLocatorPreparer", + ProxyPreparer.class, defaultPreparer); + + dgm = (DiscoveryGroupManagement) + Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, "discoveryManager", + DiscoveryGroupManagement.class, + new LookupDiscoveryManager( + DiscoveryGroupManagement.NO_GROUPS, null, null, + config)); + + if (!(dgm instanceof DiscoveryManagement)) + throw throwNewConfigurationException("Entry for component " + + OutriggerServerImpl.COMPONENT_NAME + ", name " + + "discoveryManager must implement " + + "net.jini.discovery.DiscoveryGroupManagement"); + + if (!(dgm instanceof DiscoveryLocatorManagement)) + throw throwNewConfigurationException("Entry for component " + + OutriggerServerImpl.COMPONENT_NAME + ", name " + + "discoveryManager must implement " + + "net.jini.discovery.DiscoveryLocatorManagement"); + + final String[] toCheck = dgm.getGroups(); + if (toCheck == null || toCheck.length != 0) + throw throwNewConfigurationException("Entry for component " + + OutriggerServerImpl.COMPONENT_NAME + ", name " + + "discoveryManager must be initially configured with no " + + "groups"); + + if (((DiscoveryLocatorManagement)dgm).getLocators().length != 0) + throw throwNewConfigurationException("Entry for component " + + OutriggerServerImpl.COMPONENT_NAME + ", name " + + "discoveryManager must be initially configured with no " + + "locators"); + + // if this is the first incarnation, consult config for groups, + // locators and attributes. + if (initial) { + groups = (String[]) + config.getEntry(OutriggerServerImpl.COMPONENT_NAME, + "initialLookupGroups", String[].class, + new String[]{""}); + + locators = (LookupLocator[]) + Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, + "initialLookupLocators", LookupLocator[].class, + new LookupLocator[0]); + + final Entry[] cAttrs = (Entry[]) + Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, + "initialLookupAttributes", Entry[].class, new Entry[0]); + + if (cAttrs.length == 0) { + attributes = baseAttributes; + } else { + attributes = + new Entry[cAttrs.length + baseAttributes.length]; + System.arraycopy(baseAttributes, 0, attributes, + 0, baseAttributes.length); + System.arraycopy(cAttrs, 0, attributes, + baseAttributes.length, cAttrs.length); + } + } else { + /* recovery : if there are any locators get and + * use recoveredLookupLocatorPreparer + */ + if (locators.length > 0) { + final ProxyPreparer recoveredLookupLocatorPreparer = + (ProxyPreparer)Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, + "recoveredLookupLocatorPreparer", ProxyPreparer.class, + defaultPreparer); + + final List prepared = new java.util.LinkedList(); + for (int i=0; i<locators.length; i++) { + final LookupLocator locator = locators[i]; + try { + prepared.add(recoveredLookupLocatorPreparer. + prepareProxy(locator)); + } catch (Throwable t) { + logger.log(Level.INFO, + "Encountered exception preparing lookup locator " + + "for " + locator + ", dropping locator", t); + } + } + + locators = + (LookupLocator[])prepared.toArray(new LookupLocator[0]); + } + } + + // Now that we have groups & locators (either from + // a previous incarnation or from the config) start discovery. + if (logger.isLoggable(Level.CONFIG)) { + if (groups == null) { + logger.log(Level.CONFIG, "joining all groups"); + } else if (groups.length == 0) { + logger.log(Level.CONFIG, "joining no groups"); + } else { + final StringBuffer buf = new StringBuffer(); + buf.append("joining groups:"); + for (int i=0; i<groups.length; i++) { + if (i != 0) + buf.append(","); + + buf.append("\""); + buf.append(groups[i]); + buf.append("\""); + } + logger.log(Level.CONFIG, buf.toString()); + } + + if (locators.length == 0) { + logger.log(Level.CONFIG, "joining no specific registrars"); + } else { + final StringBuffer buf = new StringBuffer(); + buf.append("joining the specific registrars:"); + for (int i=0; i<locators.length; i++) { + if (i != 0) + buf.append(", "); + + buf.append(locators[i]); + } + logger.log(Level.CONFIG, buf.toString()); + } + + if (attributes.length == 0) { + logger.log(Level.CONFIG, "registering no attributes"); + } else { + final StringBuffer buf = new StringBuffer(); + buf.append("registering the attributes:"); + for (int i=0; i<attributes.length; i++) { + if (i != 0) + buf.append(", "); + + buf.append(attributes[i]); + } + logger.log(Level.CONFIG, buf.toString()); + } + } + + dgm.setGroups(groups); + ((DiscoveryLocatorManagement)dgm).setLocators(locators); + + mgr = new JoinManager(service, attributes, serviceID, + (DiscoveryManagement)dgm, null, config); + + // Once we are running we don't need the attributes, + // locators, and groups fields, null them out (the + // state is in the mgr and dgm. + attributes = null; + groups = null; + locators = null; + + // Now that we have state, make sure it is written to disk. + update(); + } + + /** + * Make a good faith attempt to terminate + * discovery, and cancel any lookup registrations. */ + public void destroy() { + // Unregister with lookup + + // Terminate the JoinManager first so it will not call + // into the dgm after it has been terminated. + if (mgr != null) + mgr.terminate(); + + if (dgm != null) + ((DiscoveryManagement)dgm).terminate(); + } + + /* Basically we are implementing JoinAdmin, for get methods we just + * delegate to JoinManager, for the set methods we call + * JoinManager to and then persist the change by calling the + * appropriate method on our JoinAdminState. If the call on our + * JoinAdminState throws an IOException we throw a runtime + * exception since JoinAdmin methods don't let us throw a + * IOException + */ + + /** + * Get the current attribute sets for the service. + * + * @return the current attribute sets for the service + */ + public Entry[] getLookupAttributes() { + return mgr.getAttributes(); + } + + /** + * Add attribute sets for the service. The resulting set will be used + * for all future joins. The attribute sets are also added to all + * currently-joined lookup services. + * + * @param attrSets the attribute sets to add + * @throws java.rmi.RuntimeException if the change can not be persisted. + */ + public void addLookupAttributes(Entry[] attrSets) { + mgr.addAttributes(attrSets, true); + update(); + } + + /** + * Modify the current attribute sets, using the same semantics as + * ServiceRegistration.modifyAttributes. The resulting set will be used + * for all future joins. The same modifications are also made to all + * currently-joined lookup services. + * + * @param attrSetTemplates the templates for matching attribute sets + * @param attrSets the modifications to make to matching sets + * + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see net.jini.core.lookup.ServiceRegistration#modifyAttributes + */ + public void modifyLookupAttributes(Entry[] attrSetTemplates, + Entry[] attrSets) { + mgr.modifyAttributes(attrSetTemplates, attrSets, true); + update(); + } + + /** + * Get the list of groups to join. An empty array means the service + * joins no groups (as opposed to "all" groups). + * + * @return an array of groups to join. An empty array means the service + * joins no groups (as opposed to "all" groups). + * @see #setLookupGroups + */ + public String[] getLookupGroups() { + return dgm.getGroups(); + } + + /** + * Add new groups to the set to join. Lookup services in the new + * groups will be discovered and joined. + * + * @param groups groups to join + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #removeLookupGroups + */ + public void addLookupGroups(String[] groups) { + try { + dgm.addGroups(groups); + } catch (IOException e) { + throw propagateIOException("Could not change groups", e); + } + update(); + } + + /** + * Remove groups from the set to join. Leases are cancelled at lookup + * services that are not members of any of the remaining groups. + * + * @param groups groups to leave + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #addLookupGroups + */ + public void removeLookupGroups(String[] groups) { + dgm.removeGroups(groups); + update(); + } + + /** + * Replace the list of groups to join with a new list. Leases are + * cancelled at lookup services that are not members of any of the + * new groups. Lookup services in the new groups will be discovered + * and joined. + * + * @param groups groups to join + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #getLookupGroups + */ + public void setLookupGroups(String[] groups) { + try { + dgm.setGroups(groups); + } catch (IOException e) { + throw propagateIOException("Could not change groups", e); + } + update(); + } + + /** + * Get the list of locators of specific lookup services to join. + * + * @return the list of locators of specific lookup services to join + * @see #setLookupLocators + */ + public LookupLocator[] getLookupLocators() { + return ((DiscoveryLocatorManagement)dgm).getLocators(); + } + + /** + * Add locators for specific new lookup services to join. The new + * lookup services will be discovered and joined. + * + * @param locators locators of specific lookup services to join + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #removeLookupLocators + */ + public void addLookupLocators(LookupLocator[] locators) + throws RemoteException + { + prepareLocators(locators); + ((DiscoveryLocatorManagement)dgm).addLocators(locators); + update(); + } + + /** + * Remove locators for specific lookup services from the set to join. + * Any leases held at the lookup services are cancelled. + * + * @param locators locators of specific lookup services to leave + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #addLookupLocators + */ + public void removeLookupLocators(LookupLocator[] locators) + throws RemoteException + { + prepareLocators(locators); + ((DiscoveryLocatorManagement)dgm).removeLocators(locators); + update(); + } + + /** + * Replace the list of locators of specific lookup services to join + * with a new list. Leases are cancelled at lookup services that were + * in the old list but are not in the new list. Any new lookup services + * will be discovered and joined. + * + * @param locators locators of specific lookup services to join + * @throws java.rmi.RuntimeException if the change can not be persisted. + * @see #getLookupLocators + */ + public void setLookupLocators(LookupLocator[] locators) + throws RemoteException + { + prepareLocators(locators); + ((DiscoveryLocatorManagement)dgm).setLocators(locators); + update(); + } + + private void update() { + if (log != null) + log.joinStateOp(this); + } + + /** + * Apply <code>lookupLocatorPreparer</code> to each locator in the + * array, replacing the original locator with the result of the + * <code>prepareProxy</code> call. If call fails with an exception + * throw that exception. + * @param locators the <code>LookupLocator</code>s to be prepared. + * @throws RemoteException if preparation of any of the locators + * does. + * @throws SecurityException if preparation of any of the locators + * does. + */ + private void prepareLocators(LookupLocator[] locators) + throws RemoteException + { + for (int i = 0; i<locators.length; i++) + locators[i] = (LookupLocator)lookupLocatorPreparer.prepareProxy( + locators[i]); + } + + /** + * Utility method to write out an array of entities to an + * <code>ObjectOutputStream</code>. Can be recovered by a call + * to <code>readAttributes()</code> + * <p> + * Packages each attribute in its own <code>MarshalledObject</code> so + * a bad codebase on an attribute class will not corrupt the whole array. + */ + // @see JoinAdminActivationState#readAttributes + static private void writeAttributes(Entry[] attributes, + ObjectOutputStream out) + throws IOException + { + // Need to package each attribute in its own marshaled object, + // this makes sure that the attribute's code base is preserved + // and when we unpack to discard attributes who's codebase + // has been lost without throwing away those we can still deal with. + + out.writeInt(attributes.length); + for (int i=0; i<attributes.length; i++) { + out.writeObject( + new MarshalledInstance(attributes[i]).convertToMarshalledObject()); + } + } + + /** + * Utility method to read in an array of entities from a + * <code>ObjectInputStream</code>. Array should have been written + * by a call to <code>writeAttributes()</code> + * <p> + * + * Will try and recover as many attributes as possible. + * Attributes which can't be recovered won't be returned but they + * will remain in the log. + */ + // @see JoinAdminActivationState#writeAttributes + static private Entry[] readAttributes(ObjectInputStream in) + throws IOException, ClassNotFoundException + { + final List entries = new java.util.LinkedList(); + final int objectCount = in.readInt(); + for (int i=0; i<objectCount; i++) { + try { + MarshalledObject mo = (MarshalledObject)in.readObject(); + entries.add( new MarshalledInstance(mo).get(false)); + } catch (IOException e) { + logger.log(Level.INFO, "Encountered IOException recovering " + + "attribute, dropping attribute", e); + } catch (ClassNotFoundException e) { + logger.log(Level.INFO, "Encountered ClassNotFoundException " + + "recovering attribute, dropping attribute", e); + } + } + + return (Entry[])entries.toArray(new Entry[0]); + } + + // ----------------------------------- + // Methods required by StorableObject + // ----------------------------------- + + // inherit doc comment + public void store(ObjectOutputStream out) throws IOException { + writeAttributes(mgr.getAttributes(), out); + out.writeObject(((DiscoveryLocatorManagement)dgm).getLocators()); + out.writeObject(dgm.getGroups()); + } + + // inherit doc comment + public JoinStateManager restore(ObjectInputStream in) + throws IOException, ClassNotFoundException + { + initial = false; + attributes = readAttributes(in); + locators = (LookupLocator [])in.readObject(); + groups = (String [])in.readObject(); + return this; + } + + /** + * Construct, log, and throw a new ConfigurationException with + * the given message. + */ + private static ConfigurationException throwNewConfigurationException( + String msg) + throws ConfigurationException + { + final ConfigurationException e = new ConfigurationException(msg); + + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, msg, e); + } + + throw e; + } + + /** + * Propagate an IOException by wrapping it in a RuntimeException. + * Performs appropriate logging. + */ + private static RuntimeException propagateIOException(String msg, + IOException nested) + { + final RuntimeException e = new RuntimeException(msg, nested); + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, msg, e); + } + + throw e; + } +}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/LogOps.java Sun Jul 5 11:41:39 2020 @@ -1,217 +1,221 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import net.jini.id.Uuid; - -/** - * Methods that log an operation. These are used - * when writing to the store. <p> - * - * The <code>writeOp</code> and <code>takeOp</code> methods can - * be called under a transaction. For these methods an in-progress - * transaction is indicated by a non-null <code>txnId</code> parameter. - * The <code>txnId</code> is an identifier for a transaction. - * Each unique transaction must have a unique identifier and all - * write or take operations under the same transaction should - * use the same identifier. The store does not interpret the - * identifier in any way. <p> - * - * <i>Note: Because the transaction identifier - * must be unique, it can not be <code>ServerTransaction.id</code>. - * Instead this identifier is the <code>Txn</code> ID. </i> <p> - * - * When a transaction is closed, the <code>prepareOp</code>, - * <code>commitOp</code>, and <code>abortOp</code> methods are - * passed the identifier (<code>txnId</code>) for that transaction. If - * <code>prepareOp</code> is called and there is a restart, the - * <code>txnId</code> passed to the write and take operations will - * be passed back to the server via the <code>Recover.recoverWrite</code> - * and <code>Recover.recoverTake</code> methods. Likewise the same - * identifier will be passed to <code>Recover.recoverTransaction</code>. - * - * @see Store - * @see Recover - * - * @author Sun Microsystems, Inc. - * - */ -public interface LogOps { - - /** - * Log a server boot (first time start or any reactivation). - * - * @see Recover#recoverSessionId - * - * @param time stamp for this boot - * - * @param sessionId of this boot - */ - void bootOp(long time, long sessionId); - - /** - * Log an update to the join state - * - * @see Recover#recoverJoinState - * - * @param state to be logged - * - */ - void joinStateOp(StorableObject state); - - /** - * Log a <code>write</code> operation. If the operation was - * performed under a transaction the <code>txnId</code> is - * the identifier for that transaction. - * - * @see Recover#recoverWrite - * - * @param entry to be logged - * - * @param txnId transaction identifier or <code>null</code> if - * no transaction is active for this write - */ - void writeOp(StorableResource entry, Long txnId); - - /** - * Log a batch <code>write</code> operation. If the operation was - * performed under a transaction the <code>txnId</code> is - * the identifier for that transaction. - * - * @see Recover#recoverWrite - * @param entries to be logged - * @param txnId transaction identifier or <code>null</code> if - * no transaction is active for this write - */ - void writeOp(StorableResource entries[], Long txnId); - - /** - * Log a <code>take</code> operation. If the operation was - * performed under a transaction the <code>txnId</code> is - * the identifier for that transaction. - * - * @see Recover#recoverTake - * - * @param cookie ID identifying the entry target to be taken - * - * @param txnId transaction identifier or <code>null</code> if - * no transaction is active for this take - */ - void takeOp(Uuid cookie, Long txnId); - - /** - * Log a batch <code>take</code> operation. If the operation was - * performed under a transaction the <code>txnId</code> is - * the identifier for that transaction. - * - * @see Recover#recoverTake - * - * @param cookies IDs identifying the entries to be taken - * - * @param txnId transaction identifier or <code>null</code> if - * no transaction is active for this take - */ - void takeOp(Uuid[] cookies, Long txnId); - - /** - * Log a <code>notify</code> operation. Notifications under - * transactions are lost at the end of the transaction, so the - * only ones that are logged are those that are under no - * transaction. - * - * @see Recover#recoverRegister - * - * @param registration to be logged - * - * @param type of registration, passed back via <code>type</code> - * parameter of corresponding <code>recoverRegister</code> - * call - * - * @param templates associated with this registration - */ - void registerOp(StorableResource registration, String type, - StorableObject[] templates); - - /** - * Log a <code>renew</code> operation. We use the expiration, not - * the extension, because we don't want to calculate the - * expiration relative to when we read the log -- we want to use - * the exact expiration granted. - * - * @see StoredResource - * - * @param cookie ID of the entry or registration associated with this - * renew - * - * @param expiration time - */ - void renewOp(Uuid cookie, long expiration); - - /** - * Log a <code>cancel</code> and entry or registration. The entry or - * registration associated with <code>cookie</code> will no longer - * be recoverable and may be removed from the log records. - * - * @param cookie ID of the entry or registration to cancel - * @param expired is true if the cancel was due to a lease expiration - */ - void cancelOp(Uuid cookie, boolean expired); - - /** - * Log a transaction <code>prepare</code>. If there is a restart - * before either <code>commitOp</code> or <code>abortOp</code> is - * called for the transaction identified by <code>txnId</code>, - * all write and take operations associated with <code>txnId</code> - * will be recovered and <code>Recover.recoverTransaction</code> - * called with the the same <code>txnId</code>. - * - * @see Recover#recoverTransaction - * - * @param txnId identifier of the transaction to be prepared - * - * @param transaction object associated with this transaction - */ - void prepareOp(Long txnId, StorableObject transaction); - - /** - * Log a transaction <code>commit</code> or - * <code>prepareAndCommit</code>. The store will commit the write - * and take operations associated with <code>txnId</code>. A call - * to <code>prepareOP</code> is not required for - * <code>commitOp</code> to be called. - * - * @param txnId identifier of the transaction to be committed - */ - void commitOp(Long txnId); - - /** - * Log a transaction <code>abort</code>. Any write and take operations - * associated with <code>txnId</code> will no longer be recoverable - * and may be removed from the log records. - * - * @param txnId identifier of the transaction to be aborted - */ - void abortOp(Long txnId); - - /** - * Log the <code>Uuid</code> that identifies the space as a whole. - * @see Recover#recoverUuid - * @param uuid The <code>Uuid</code> to be stored. - */ - void uuidOp(Uuid uuid); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import net.jini.id.Uuid; +import org.apache.river.outrigger.proxy.StorableObject; +import org.apache.river.outrigger.proxy.StorableResource; + + + +/** + * Methods that log an operation. These are used + * when writing to the store. <p> + * + * The <code>writeOp</code> and <code>takeOp</code> methods can + * be called under a transaction. For these methods an in-progress + * transaction is indicated by a non-null <code>txnId</code> parameter. + * The <code>txnId</code> is an identifier for a transaction. + * Each unique transaction must have a unique identifier and all + * write or take operations under the same transaction should + * use the same identifier. The store does not interpret the + * identifier in any way. <p> + * + * <i>Note: Because the transaction identifier + * must be unique, it can not be <code>ServerTransaction.id</code>. + * Instead this identifier is the <code>Txn</code> ID. </i> <p> + * + * When a transaction is closed, the <code>prepareOp</code>, + * <code>commitOp</code>, and <code>abortOp</code> methods are + * passed the identifier (<code>txnId</code>) for that transaction. If + * <code>prepareOp</code> is called and there is a restart, the + * <code>txnId</code> passed to the write and take operations will + * be passed back to the server via the <code>Recover.recoverWrite</code> + * and <code>Recover.recoverTake</code> methods. Likewise the same + * identifier will be passed to <code>Recover.recoverTransaction</code>. + * + * @see Store + * @see Recover + * + * @author Sun Microsystems, Inc. + * + */ +public interface LogOps { + + /** + * Log a server boot (first time start or any reactivation). + * + * @see Recover#recoverSessionId + * + * @param time stamp for this boot + * + * @param sessionId of this boot + */ + void bootOp(long time, long sessionId); + + /** + * Log an update to the join state + * + * @see Recover#recoverJoinState + * + * @param state to be logged + * + */ + void joinStateOp(StorableObject state); + + /** + * Log a <code>write</code> operation. If the operation was + * performed under a transaction the <code>txnId</code> is + * the identifier for that transaction. + * + * @see Recover#recoverWrite + * + * @param entry to be logged + * + * @param txnId transaction identifier or <code>null</code> if + * no transaction is active for this write + */ + void writeOp(StorableResource entry, Long txnId); + + /** + * Log a batch <code>write</code> operation. If the operation was + * performed under a transaction the <code>txnId</code> is + * the identifier for that transaction. + * + * @see Recover#recoverWrite + * @param entries to be logged + * @param txnId transaction identifier or <code>null</code> if + * no transaction is active for this write + */ + void writeOp(StorableResource entries[], Long txnId); + + /** + * Log a <code>take</code> operation. If the operation was + * performed under a transaction the <code>txnId</code> is + * the identifier for that transaction. + * + * @see Recover#recoverTake + * + * @param cookie ID identifying the entry target to be taken + * + * @param txnId transaction identifier or <code>null</code> if + * no transaction is active for this take + */ + void takeOp(Uuid cookie, Long txnId); + + /** + * Log a batch <code>take</code> operation. If the operation was + * performed under a transaction the <code>txnId</code> is + * the identifier for that transaction. + * + * @see Recover#recoverTake + * + * @param cookies IDs identifying the entries to be taken + * + * @param txnId transaction identifier or <code>null</code> if + * no transaction is active for this take + */ + void takeOp(Uuid[] cookies, Long txnId); + + /** + * Log a <code>notify</code> operation. Notifications under + * transactions are lost at the end of the transaction, so the + * only ones that are logged are those that are under no + * transaction. + * + * @see Recover#recoverRegister + * + * @param registration to be logged + * + * @param type of registration, passed back via <code>type</code> + * parameter of corresponding <code>recoverRegister</code> + * call + * + * @param templates associated with this registration + */ + void registerOp(StorableResource registration, String type, + StorableObject[] templates); + + /** + * Log a <code>renew</code> operation. We use the expiration, not + * the extension, because we don't want to calculate the + * expiration relative to when we read the log -- we want to use + * the exact expiration granted. + * + * @see StoredResource + * + * @param cookie ID of the entry or registration associated with this + * renew + * + * @param expiration time + */ + void renewOp(Uuid cookie, long expiration); + + /** + * Log a <code>cancel</code> and entry or registration. The entry or + * registration associated with <code>cookie</code> will no longer + * be recoverable and may be removed from the log records. + * + * @param cookie ID of the entry or registration to cancel + * @param expired is true if the cancel was due to a lease expiration + */ + void cancelOp(Uuid cookie, boolean expired); + + /** + * Log a transaction <code>prepare</code>. If there is a restart + * before either <code>commitOp</code> or <code>abortOp</code> is + * called for the transaction identified by <code>txnId</code>, + * all write and take operations associated with <code>txnId</code> + * will be recovered and <code>Recover.recoverTransaction</code> + * called with the the same <code>txnId</code>. + * + * @see Recover#recoverTransaction + * + * @param txnId identifier of the transaction to be prepared + * + * @param transaction object associated with this transaction + */ + void prepareOp(Long txnId, StorableObject transaction); + + /** + * Log a transaction <code>commit</code> or + * <code>prepareAndCommit</code>. The store will commit the write + * and take operations associated with <code>txnId</code>. A call + * to <code>prepareOP</code> is not required for + * <code>commitOp</code> to be called. + * + * @param txnId identifier of the transaction to be committed + */ + void commitOp(Long txnId); + + /** + * Log a transaction <code>abort</code>. Any write and take operations + * associated with <code>txnId</code> will no longer be recoverable + * and may be removed from the log records. + * + * @param txnId identifier of the transaction to be aborted + */ + void abortOp(Long txnId); + + /** + * Log the <code>Uuid</code> that identifies the space as a whole. + * @see Recover#recoverUuid + * @param uuid The <code>Uuid</code> to be stored. + */ + void uuidOp(Uuid uuid); +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Notifier.java Sun Jul 5 11:41:39 2020 @@ -1,270 +1,270 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.io.IOException; -import java.rmi.RemoteException; -import java.util.logging.Level; -import java.util.logging.Logger; -import net.jini.core.event.UnknownEventException; -import net.jini.config.Configuration; -import net.jini.config.ConfigurationException; -import net.jini.security.ProxyPreparer; -import net.jini.space.JavaSpace; - -import org.apache.river.constants.ThrowableConstants; -import org.apache.river.config.Config; -import org.apache.river.logging.Levels; -import org.apache.river.thread.RetryTask; -import org.apache.river.thread.WakeupManager; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.river.thread.NamedThreadFactory; - -/** - * The notifier thread. This thread is responsible for notifying - * objects for which interest has been registered. It operates in - * transient space as much as possible. Pending notifications will be - * lost when the server goes down, but registrations of interest - * survive across server crashes for persistent servers. - * - * @author Sun Microsystems, Inc. - * - * @see JavaSpace#notify - * @see OutriggerServerImpl#notify - */ -// @see NotifyChit -class Notifier implements org.apache.river.constants.TimeConstants { - /** - * The object to use for the <code>source</code> when creating - * events. - */ - private final JavaSpace source; - - /** Proxy preparer to use on recovered listeners */ - private final ProxyPreparer recoveredListenerPreparer; - - /** wakeup manager for <code>NotifyTask</code> */ - private final WakeupManager wakeupMgr = - new WakeupManager(new WakeupManager.ThreadDesc(null, true)); - - /** pending notifications tasks */ - private final ExecutorService pending; - - private final static int MAX_ATTEMPTS = 10; // max times to retry - - /** Logger for logging event related information */ - private static final Logger logger = - Logger.getLogger(OutriggerServerImpl.eventLoggerName); - - /** - * Create a notifier connected to the given <code>space</code>. - * @param source the value to use for the <code>source</code> in - * remote event objects. - * @param recoveredListenerPreparer <code>ProxyPreparer</code> to - * apply to recovered listeners. - * @param config a source of configuration data.a - * @throws ConfigurationException if there is a problem - * with the passed configuration. - * @throws NullPointerException if <code>source</code> or - * <code>config</code> arguments are <code>null</code>. - */ - Notifier(JavaSpace source, ProxyPreparer recoveredListenerPreparer, - Configuration config) - throws ConfigurationException - { - if (source == null) - throw new NullPointerException("source must be non-null"); - this.source = source; - - this.recoveredListenerPreparer = recoveredListenerPreparer; - - pending = Config.getNonNullEntry(config, - OutriggerServerImpl.COMPONENT_NAME, "notificationsExecutorService", - ExecutorService.class, - new ThreadPoolExecutor( - 10, - 10, /* Ignored */ - 15, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */ - new NamedThreadFactory("OutriggerServerImpl Notifier", false) - ) - ); - } - - /** - * Terminate the notifier, shutting down any threads - * it has running. This method can assume that - * the constructor completed. - */ - void terminate() { - pending.shutdown(); - wakeupMgr.stop(); - wakeupMgr.cancelAll(); - } - - /** - * Queue up an event for delivery. - * @param sender An object that on request will - * attempt to deliver its event - * to the associated listener. - * @throws NullPointerException if <code>sender</code> is - * <code>null</code> - */ - void enqueueDelivery(EventSender sender) { - pending.execute(new NotifyTask(sender)); - } - - /* - * Static stuff for Pending (can't put it in the class, unfortunately). - */ - // 1 day =hrs mins secs milliseconds - private static final long MAX_TIME = 1 * DAYS; - private static final long delays[] = { - 1 * SECONDS, 5 * SECONDS, - 10 * SECONDS, 60 * SECONDS, 60 * SECONDS - }; - - static { - /* - * Make the delays the amount of time since the start -- it - * is easier to declare the intervals, but the elapsed time is - * more <i>useful</i>. - */ - for (int i = 1; i < delays.length; i++) - delays[i] += delays[i - 1]; - } - - /** - * A task that represent a notification of matching a particular - * template under a given transaction. - */ - private class NotifyTask extends RetryTask { - /** Who and what to send a event to. */ - private final EventSender sender; - - /** - * Create an object to represent this list of chits needing - * notification. - * @param sender An object that on request will - * attempt to deliver its event - * to the associated listener. - * @throws NullPointerException if <code>sender</code> is - * <code>null</code> - */ - NotifyTask(EventSender sender) { - super(Notifier.this.pending, Notifier.this.wakeupMgr); - if (sender == null) - throw new NullPointerException("sender must be non-null"); - this.sender = sender; - } - - /** - * Try to notify the target. Return <code>true</code> if the - * notification was successful. - * <p> - * We know that we are the only one dealing with the given chit - * because <code>runAfter</code> makes sure of it. - */ - public boolean tryOnce() { - long curTime = System.currentTimeMillis(); - if (curTime - startTime() > MAX_TIME) { - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, - "giving up on delivering event, keeping registration"); - } - - return true; // just stop here, we are declaring "success" - } - - boolean successful = true; // notification successful? - try { - sender.sendEvent(source, curTime, recoveredListenerPreparer); - } catch (UnknownEventException e) { - // they didn't want to know about this, so stop them getting - // future notifications, too. - logFailure("UnknownEventException", Level.FINER, true, e); - sender.cancelRegistration(); - // this is still "successful" -- we know to stop sending this - } catch (RemoteException e) { - final int cat = ThrowableConstants.retryable(e); - - if (cat == ThrowableConstants.BAD_INVOCATION || - cat == ThrowableConstants.BAD_OBJECT) - { - // Listener probably bad, retry likely to fail. - logFailure("definite exception", Level.INFO, true, e); - sender.cancelRegistration(); - } else if (cat == ThrowableConstants.INDEFINITE) { - // try, try, again - logFailure("indefinite exception", Levels.FAILED, - false, e); - successful = false; - } else if (cat == ThrowableConstants.UNCATEGORIZED) { - // Same as above but log differently. - logFailure("uncategorized exception", Level.INFO, false, - e); - successful = false; - } else { - logger.log(Level.WARNING, "ThrowableConstants.retryable " + - "returned out of range value, " + cat, - new AssertionError(e)); - successful = false; - } - } catch (IOException e) { - // corrupted listener? unlikely to get better, cancel - logFailure("IOException", Level.INFO, true, e); - sender.cancelRegistration(); - } catch (ClassNotFoundException e) { - // probably a codebase problem, retry - logFailure("ClassNotFoundException", Levels.FAILED, false, e); - successful = false; - } catch (RuntimeException e) { - /* bad listener, or preparer, either way unlikely to - * get better - */ - logFailure("RuntimeException", Level.INFO, true, e); - sender.cancelRegistration(); - } - - if (!successful && attempt() > MAX_ATTEMPTS) { - if (logger.isLoggable(Levels.FAILED)) { - logger.log(Levels.FAILED, - "giving up on delivering event, keeping registration"); - } - return true; // as successful as we're going to be - } - - return successful; - } - - /** Log a failed delivery attempt */ - private void logFailure(String exceptionDescription, Level level, - boolean terminal, Throwable t) - { - if (logger.isLoggable(level)) { - logger.log(level, "Encountered " + exceptionDescription + - "while preparing to send/sending event, " + - (terminal?"dropping":"keeping") + " registration", t); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.io.IOException; +import java.rmi.RemoteException; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.jini.core.event.UnknownEventException; +import net.jini.config.Configuration; +import net.jini.config.ConfigurationException; +import net.jini.security.ProxyPreparer; +import net.jini.space.JavaSpace; + +import org.apache.river.constants.ThrowableConstants; +import org.apache.river.config.Config; +import org.apache.river.logging.Levels; +import org.apache.river.thread.wakeup.RetryTask; +import org.apache.river.thread.wakeup.WakeupManager; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.river.thread.NamedThreadFactory; + +/** + * The notifier thread. This thread is responsible for notifying + * objects for which interest has been registered. It operates in + * transient space as much as possible. Pending notifications will be + * lost when the server goes down, but registrations of interest + * survive across server crashes for persistent servers. + * + * @author Sun Microsystems, Inc. + * + * @see JavaSpace#notify + * @see OutriggerServerImpl#notify + */ +// @see NotifyChit +class Notifier implements org.apache.river.constants.TimeConstants { + /** + * The object to use for the <code>source</code> when creating + * events. + */ + private final JavaSpace source; + + /** Proxy preparer to use on recovered listeners */ + private final ProxyPreparer recoveredListenerPreparer; + + /** wakeup manager for <code>NotifyTask</code> */ + private final WakeupManager wakeupMgr = + new WakeupManager(new WakeupManager.ThreadDesc(null, true)); + + /** pending notifications tasks */ + private final ExecutorService pending; + + private final static int MAX_ATTEMPTS = 10; // max times to retry + + /** Logger for logging event related information */ + private static final Logger logger = + Logger.getLogger(OutriggerServerImpl.eventLoggerName); + + /** + * Create a notifier connected to the given <code>space</code>. + * @param source the value to use for the <code>source</code> in + * remote event objects. + * @param recoveredListenerPreparer <code>ProxyPreparer</code> to + * apply to recovered listeners. + * @param config a source of configuration data.a + * @throws ConfigurationException if there is a problem + * with the passed configuration. + * @throws NullPointerException if <code>source</code> or + * <code>config</code> arguments are <code>null</code>. + */ + Notifier(JavaSpace source, ProxyPreparer recoveredListenerPreparer, + Configuration config) + throws ConfigurationException + { + if (source == null) + throw new NullPointerException("source must be non-null"); + this.source = source; + + this.recoveredListenerPreparer = recoveredListenerPreparer; + + pending = Config.getNonNullEntry(config, + OutriggerServerImpl.COMPONENT_NAME, "notificationsExecutorService", + ExecutorService.class, + new ThreadPoolExecutor( + 10, + 10, /* Ignored */ + 15, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */ + new NamedThreadFactory("OutriggerServerImpl Notifier", false) + ) + ); + } + + /** + * Terminate the notifier, shutting down any threads + * it has running. This method can assume that + * the constructor completed. + */ + void terminate() { + pending.shutdown(); + wakeupMgr.stop(); + wakeupMgr.cancelAll(); + } + + /** + * Queue up an event for delivery. + * @param sender An object that on request will + * attempt to deliver its event + * to the associated listener. + * @throws NullPointerException if <code>sender</code> is + * <code>null</code> + */ + void enqueueDelivery(EventSender sender) { + pending.execute(new NotifyTask(sender)); + } + + /* + * Static stuff for Pending (can't put it in the class, unfortunately). + */ + // 1 day =hrs mins secs milliseconds + private static final long MAX_TIME = 1 * DAYS; + private static final long delays[] = { + 1 * SECONDS, 5 * SECONDS, + 10 * SECONDS, 60 * SECONDS, 60 * SECONDS + }; + + static { + /* + * Make the delays the amount of time since the start -- it + * is easier to declare the intervals, but the elapsed time is + * more <i>useful</i>. + */ + for (int i = 1; i < delays.length; i++) + delays[i] += delays[i - 1]; + } + + /** + * A task that represent a notification of matching a particular + * template under a given transaction. + */ + private class NotifyTask extends RetryTask { + /** Who and what to send a event to. */ + private final EventSender sender; + + /** + * Create an object to represent this list of chits needing + * notification. + * @param sender An object that on request will + * attempt to deliver its event + * to the associated listener. + * @throws NullPointerException if <code>sender</code> is + * <code>null</code> + */ + NotifyTask(EventSender sender) { + super(Notifier.this.pending, Notifier.this.wakeupMgr); + if (sender == null) + throw new NullPointerException("sender must be non-null"); + this.sender = sender; + } + + /** + * Try to notify the target. Return <code>true</code> if the + * notification was successful. + * <p> + * We know that we are the only one dealing with the given chit + * because <code>runAfter</code> makes sure of it. + */ + public boolean tryOnce() { + long curTime = System.currentTimeMillis(); + if (curTime - startTime() > MAX_TIME) { + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, + "giving up on delivering event, keeping registration"); + } + + return true; // just stop here, we are declaring "success" + } + + boolean successful = true; // notification successful? + try { + sender.sendEvent(source, curTime, recoveredListenerPreparer); + } catch (UnknownEventException e) { + // they didn't want to know about this, so stop them getting + // future notifications, too. + logFailure("UnknownEventException", Level.FINER, true, e); + sender.cancelRegistration(); + // this is still "successful" -- we know to stop sending this + } catch (RemoteException e) { + final int cat = ThrowableConstants.retryable(e); + + if (cat == ThrowableConstants.BAD_INVOCATION || + cat == ThrowableConstants.BAD_OBJECT) + { + // Listener probably bad, retry likely to fail. + logFailure("definite exception", Level.INFO, true, e); + sender.cancelRegistration(); + } else if (cat == ThrowableConstants.INDEFINITE) { + // try, try, again + logFailure("indefinite exception", Levels.FAILED, + false, e); + successful = false; + } else if (cat == ThrowableConstants.UNCATEGORIZED) { + // Same as above but log differently. + logFailure("uncategorized exception", Level.INFO, false, + e); + successful = false; + } else { + logger.log(Level.WARNING, "ThrowableConstants.retryable " + + "returned out of range value, " + cat, + new AssertionError(e)); + successful = false; + } + } catch (IOException e) { + // corrupted listener? unlikely to get better, cancel + logFailure("IOException", Level.INFO, true, e); + sender.cancelRegistration(); + } catch (ClassNotFoundException e) { + // probably a codebase problem, retry + logFailure("ClassNotFoundException", Levels.FAILED, false, e); + successful = false; + } catch (RuntimeException e) { + /* bad listener, or preparer, either way unlikely to + * get better + */ + logFailure("RuntimeException", Level.INFO, true, e); + sender.cancelRegistration(); + } + + if (!successful && attempt() > MAX_ATTEMPTS) { + if (logger.isLoggable(Levels.FAILED)) { + logger.log(Levels.FAILED, + "giving up on delivering event, keeping registration"); + } + return true; // as successful as we're going to be + } + + return successful; + } + + /** Log a failed delivery attempt */ + private void logFailure(String exceptionDescription, Level level, + boolean terminal, Throwable t) + { + if (logger.isLoggable(level)) { + logger.log(level, "Encountered " + exceptionDescription + + "while preparing to send/sending event, " + + (terminal?"dropping":"keeping") + " registration", t); + } + } + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/OutriggerServerImpl.java Sun Jul 5 11:41:39 2020 @@ -30,13 +30,16 @@ import org.apache.river.landlord.FixedLe import org.apache.river.landlord.LocalLandlord; import org.apache.river.landlord.LeaseFactory; import org.apache.river.logging.Levels; -import org.apache.river.start.LifeCycle; +import org.apache.river.start.lifecycle.LifeCycle; import org.apache.river.api.util.Startable; +import org.apache.river.outrigger.proxy.StorableObject; +import org.apache.river.outrigger.proxy.StorableResource; import net.jini.id.Uuid; import net.jini.id.UuidFactory; import net.jini.activation.ActivationGroup; + import net.jini.config.Configuration; import net.jini.config.ConfigurationProvider; import net.jini.config.ConfigurationException; @@ -72,6 +75,20 @@ import net.jini.lookup.entry.ServiceInfo import net.jini.space.InternalSpaceException; import net.jini.space.JavaSpace; +import org.apache.river.outrigger.proxy.AdminProxy; +import org.apache.river.outrigger.proxy.OutriggerServer; +import org.apache.river.outrigger.proxy.ConstrainableAdminProxy; +import org.apache.river.outrigger.proxy.ConstrainableParticipantProxy; +import org.apache.river.outrigger.proxy.ConstrainableSpaceProxy2; +import org.apache.river.outrigger.proxy.SpaceProxy2; +import org.apache.river.outrigger.proxy.ParticipantProxy; +import org.apache.river.outrigger.proxy.EntryRep; +import org.apache.river.admin.JavaSpaceAdmin; +import org.apache.river.outrigger.proxy.MatchSetData; +import org.apache.river.outrigger.proxy.OutriggerQueryCookie; +import org.apache.river.outrigger.proxy.ProxyVerifier; + + import java.io.IOException; import java.rmi.MarshalledObject; import java.rmi.NoSuchObjectException;
