Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1634322&r1=1634321&r2=1634322&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Oct 26 13:17:28 2014 @@ -1,4378 +1,4033 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.jini.lookup; - -import org.apache.river.impl.thread.DependencyLinker; -import au.net.zeus.collection.RC; -import au.net.zeus.collection.Ref; -import au.net.zeus.collection.Referrer; -import com.sun.jini.logging.Levels; -import com.sun.jini.lookup.entry.LookupAttributes; -import com.sun.jini.proxy.BasicProxyTrustVerifier; -import java.io.IOException; -import java.rmi.RemoteException; -import java.rmi.server.ExportException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; -import net.jini.config.Configuration; -import net.jini.config.ConfigurationException; -import net.jini.config.EmptyConfiguration; -import net.jini.config.NoSuchEntryException; -import net.jini.core.entry.Entry; -import net.jini.core.event.EventRegistration; -import net.jini.core.event.RemoteEvent; -import net.jini.core.event.RemoteEventListener; -import net.jini.core.event.UnknownEventException; -import net.jini.core.lease.Lease; -import net.jini.core.lookup.ServiceEvent; -import net.jini.core.lookup.ServiceID; -import net.jini.core.lookup.ServiceItem; -import net.jini.core.lookup.ServiceMatches; -import net.jini.core.lookup.ServiceRegistrar; -import net.jini.core.lookup.ServiceTemplate; -import net.jini.discovery.DiscoveryEvent; -import net.jini.discovery.DiscoveryListener; -import net.jini.discovery.DiscoveryManagement; -import net.jini.discovery.LookupDiscoveryManager; -import net.jini.export.Exporter; -import net.jini.io.MarshalledInstance; -import net.jini.jeri.BasicILFactory; -import net.jini.jeri.BasicJeriExporter; -import net.jini.jeri.tcp.TcpServerEndpoint; -import net.jini.lease.LeaseListener; -import net.jini.lease.LeaseRenewalEvent; -import net.jini.lease.LeaseRenewalManager; -import net.jini.security.BasicProxyPreparer; -import net.jini.security.ProxyPreparer; -import net.jini.security.TrustVerifier; -import net.jini.security.proxytrust.ServerProxyTrust; -import org.apache.river.api.util.FutureObserver; -import org.apache.river.api.util.FutureObserver.ObservableFuture; -import org.apache.river.api.util.FutureObserver.Subscribeable; -import org.apache.river.api.util.FutureObserver.Subscriber; -import org.apache.river.impl.thread.ExtensibleExecutorService; -import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory; -import org.apache.river.impl.thread.NamedThreadFactory; -import org.apache.river.impl.thread.ObservableFutureTask; - -/** - * The <code>ServiceDiscoveryManager</code> class is a helper utility class - * that any client-like entity can use to "discover" services registered - * with any number of lookup services of interest. On behalf of such - * entities, this class maintains - as much as possible - up-to-date - * state information about both the lookup services the entity wishes - * to query, and the services the entity wishes to acquire and use. - * By maintaining current service state information, the entity can - * implement efficient mechanisms for service access and usage. - * <p> - * There are three basic usage patterns for this class. In order of - * importance and typical usage, those patterns are: - * <p> - * <ul> - * <li> The entity requests that the <code>ServiceDiscoveryManager</code> - * create a cache (an instance of - * {@link net.jini.lookup.LookupCache LookupCache}) which will - * asynchronously "discover", and locally store, references - * to services that match criteria defined by the entity; services - * which are registered with one or more lookup services managed - * by the <code>ServiceDiscoveryManager</code> on behalf of the entity. - * The cache can be viewed as a set of service references that the - * entity can access locally as needed through one of the public, - * non-remote methods provided in the cache's interface. Thus, rather - * than making costly remote queries of multiple lookup services at - * the point in time when the entity needs the service, the entity - * can simply make local queries on the cache for the services that - * the cache acquired and stored at a prior time. An entity should - * employ this pattern when the entity must make <i>frequent</i> - * queries for multiple services. By populating the cache with - * multiple instances of the desired services, redundancy in the - * availability of those services can be provided. Thus, if an - * instance of a service is found to be unavailable when needed, - * the entity can execute a local query on the cache rather than - * one or more remote queries on the lookup services to acquire - * an instance that is available. To employ this pattern, the entity - * invokes the method - * {@link net.jini.lookup.ServiceDiscoveryManager#createLookupCache - * createLookupCache}. - * <li> The entity can register with the event mechanism provided by the - * <code>ServiceDiscoveryManager</code>. This event mechanism allows the - * entity to request that it be notified when a service of interest - * is discovered for the first time, or has encountered a state change - * such as removal from all lookup services, or attribute set changes. - * Although interacting with a local cache of services in the way - * described in the first pattern can be very useful to entities that - * need frequent access to multiple services, some client-like - * entities may wish to interact with the cache in a reactive manner. - * For example, an entity such as a service browser typically wishes - * to be notified of the arrival of new services of interest as well - * as any changes in the state of the current services in the cache. - * In these situations, polling for such changes is usually viewed as - * undesirable. If the cache were to also provide an event mechanism - * with notification semantics, the needs of entities that employ - * either pattern can be satisfied. To employ this pattern, the entity - * must create a cache and supply it with an instance of the - * {@link net.jini.lookup.ServiceDiscoveryListener - * ServiceDiscoveryListener} interface that will receive instances of - * {@link net.jini.lookup.ServiceDiscoveryEvent ServiceDiscoveryEvent} - * when events of interest, related to the services in the cache, occur. - * <li> The entity, through the public API of the - * <code>ServiceDiscoveryManager</code>, can directly query the lookup - * services managed by the <code>ServiceDiscoveryManager</code> for - * services of interest; employing semantics similar to the semantics - * employed in a typical lookup service query made through the - * {@link net.jini.core.lookup.ServiceRegistrar ServiceRegistrar} - * interface. Such queries will result in a remote call being made at - * the same time the service is needed (unlike the first pattern, in - * which remote calls typically occur prior to the time the service is - * needed). This pattern may be useful to entities needing to find - * services on an infrequent basis, or when the cost of making a remote - * call is outweighed by the overhead of maintaining a local cache (for - * example, due to limited resources). Although an entity that needs - * to query lookup service(s) can certainly make such queries through - * the {@link net.jini.core.lookup.ServiceRegistrar ServiceRegistrar} - * interface, the <code>ServiceDiscoveryManager</code> provides a broad - * API with semantics that are richer than the semantics of the - * {@link net.jini.core.lookup.ServiceRegistrar#lookup lookup} methods - * provided by the {@link net.jini.core.lookup.ServiceRegistrar - * ServiceRegistrar}. This API encapsulates functionality that many - * client-like entities may find more useful when managing both the set - * of desired lookup services, and the service queries made on those - * lookup services. To employ this pattern, the entity simply - * instantiates this class with the desired parameters, and then - * invokes the appropriate version of the - * {@link net.jini.lookup.ServiceDiscoveryManager#lookup lookup} - * method when the entity wishes to acquire a service that matches - * desired criteria. - * </ul> - * <p> - * All three mechanisms just described - local queries on the cache, - * service discovery notification, and remote lookups - employ the same - * template-matching scheme as that employed in the - * {@link net.jini.core.lookup.ServiceRegistrar ServiceRegistrar} interface. - * Additionally, each mechanism allows the entity to supply an object - * referred to as a <i>filter</i>; an instance of - * {@link net.jini.lookup.ServiceItemFilter ServiceItemFilter}. A filter - * is a non-remote object that defines additional matching criteria that the - * <code>ServiceDiscoveryManager</code> applies when searching for the - * entity's services of interest. Employing a filter is particularly useful - * to entities that wish to extend the capabilities of the standard - * template-matching scheme. - * <p> - * In addition to (or instead of) employing a filter to apply additional - * matching criteria to candidate service proxies initially found through - * template matching, filters can also be used to extend the selection - * process so that only proxies that are <i>safe</i> to use are returned - * to the entity. To do this, the entity would use the - * {@link net.jini.lookup.ServiceItemFilter ServiceItemFilter} interface to - * supply the <code>ServiceDiscoveryManager</code> or - * {@link net.jini.lookup.LookupCache LookupCache} with a filter that, - * when applied to a candidate proxy, performs a set of operations that - * is referred to as <i>proxy preparation</i>. As described in the - * documentation for {@link net.jini.security.ProxyPreparer}, proxy - * preparation typically includes operations such as, verifying trust - * in the proxy, specifying client constraints, and dynamically granting - * necessary permissions to the proxy. - * <p> - * Note that this utility class is not remote. Clients and services that wish - * to use this class will create an instance of this class in their own address - * space to manage the state of discovered services and their associated - * lookup services locally. - * - * @com.sun.jini.impl <!-- Implementation Specifics --> - * - * The following implementation-specific items are discussed below: - * <ul><li> <a href="#sdmConfigEntries">Configuring ServiceDiscoveryManager</a> - * <li> <a href="#sdmLogging">Logging</a> - * </ul> - * - * <a name="sdmConfigEntries"> - * <p> - * <b><font size="+1">Configuring ServiceDiscoveryManager</font></b> - * <p> - * </a> - * - * This implementation of <code>ServiceDiscoveryManager</code> supports - * the following configuration entries; where each configuration entry - * name is associated with the component name - * <code>net.jini.lookup.ServiceDiscoveryManager</code>. Note that the - * configuration entries specified here are specific to this implementation - * of <code>ServiceDiscoveryManager</code>. Unless otherwise stated, each - * entry is retrieved from the configuration only once per instance of this - * utility, where each such retrieval is performed in the constructor. - * <p> - * It is important to note that in addition to allowing a client of this - * utility to request - through the public API - the creation of a cache - * that is used externally by the client, this utility also creates - * instances of the cache that are used internally by the utility itself. - * As such, in addition to the configuration entries that are used only - * in this utility (and not in any cache), and the configuration entries - * that are retrieved during the construction of each new cache (and used - * by only that cache), there are configuration entries specified below - * that are retrieved once during the construction of this utility, but - * which are shared with, and used by, the caches that are created. - * - * - * <a name="cacheTaskManager"> - * <table summary="Describes the cacheTaskManager configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>cacheTaskManager</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link com.sun.jini.thread.TaskManager} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>new - * {@link com.sun.jini.thread.TaskManager#TaskManager() - * TaskManager}(10, (15*1000), 1.0f)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> The object that pools and manages the various threads - * executed by each of the lookup caches created by this - * utility. There is one such task manager created for each - * cache. The default manager creates a maximum of 10 threads, - * waits 15 seconds before removing idle threads, and uses a - * load factor of 1.0 when determining whether to create a new - * thread. For each cache that is created in this utility, a - * single, separate instance of this task manager will be - * retrieved and employed by that cache. This object should - * not be shared with other components in the application that - * employs this utility. - * </table> - * - * <a name="discardTaskManager"> - * <table summary="Describes the discardTaskManager configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>discardTaskManager</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link com.sun.jini.thread.TaskManager} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>new - * {@link com.sun.jini.thread.TaskManager#TaskManager() - * TaskManager}(10, (15*1000), 1.0f)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> The object that pools and manages the threads, executed - * by a cache, that wait on verification events after a - * previousy discovered service has been discarded. The - * default manager creates a maximum of 10 threads, waits - * 15 seconds before removing idle threads, and uses a load - * factor of 1.0 when determining whether to create a new - * thread. For each cache that is created in this utility, - * a single, separate instance of this task manager will be - * retrieved and employed by that cache. This object should - * not be shared with other components in the application - * that employs this utility. - * </table> - * - * <a name="discardWait"> - * <table summary="Describes the discardWait - * configuration entry" border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>discardWait</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> <code>long</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>2*(5*60*1000)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> The value used to affect the behavior of the mechanism - * that handles the <i>service discard problem</i> described - * in this utility's specification. This item allows each - * entity that uses this utility to define how long (in - * milliseconds) to wait for verification from the lookup - * service(s) that a discarded service is actually down - * before committing or un-committing a requested service - * discard. The current implementation of this utility - * defaults to waiting 10 minutes (twice the maximum lease - * duration granted by the Reggie implementation of the - * lookup service). Note that this item is used only by the - * caches (both internal and external) that are created by - * this utility, and not by the utility itself. - * </table> - * - * <a name="discoveryManager"> - * <table summary="Describes the discoveryManager configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>discoveryManager</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link net.jini.discovery.DiscoveryManagement} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code> new - * {@link net.jini.discovery.LookupDiscoveryManager#LookupDiscoveryManager( - * java.lang.String[], - * net.jini.core.discovery.LookupLocator[], - * net.jini.discovery.DiscoveryListener, - * net.jini.config.Configuration) LookupDiscoveryManager}( - * new java.lang.String[] {""}, - * new {@link net.jini.core.discovery.LookupLocator}[0], - * null, config)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> The object used to manage the discovery processing - * performed by this utility. This entry will be retrieved - * from the configuration only if no discovery manager is - * specified in the constructor. Note that this object should - * not be shared with other components in the application that - * employs this utility. This item is used only by the service - * discovery manager, and not by any cache that is created. - * </table> - * - * <a name="eventLeasePreparer"> - * <table summary="Describes the eventLeasePreparer configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>eventLeasePreparer</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link net.jini.security.ProxyPreparer} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>new {@link net.jini.security.BasicProxyPreparer}() - * </code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> Preparer for the leases returned when a cache registers - * with the event mechanism of any of the discovered lookup - * services. This item is used only by the caches (both - * internal and external) that are created by this utility, - * and not by the utility itself. - * <p> - * Currently, no methods of the returned proxy are invoked by - * this utility. - * </table> - * - * <a name="eventListenerExporter"> - * <table summary="Describes the eventListenerExporter configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>eventListenerExporter</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link net.jini.export.Exporter} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code> new - * {@link net.jini.jeri.BasicJeriExporter#BasicJeriExporter( - * net.jini.jeri.ServerEndpoint, - * net.jini.jeri.InvocationLayerFactory, - * boolean, - * boolean) BasicJeriExporter}( - * {@link net.jini.jeri.tcp.TcpServerEndpoint#getInstance - * TcpServerEndpoint.getInstance}(0),<br> - *             - *             - * new {@link net.jini.jeri.BasicILFactory}(),<br> - *             - *             - * false, false)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> Exporter for the remote event listener that each cache - * supplies to the lookup services whose event mechanisms - * those caches register with. Note that for each cache that - * is created in this utility, a single, separate instance - * of this exporter will be retrieved and employed by that - * cache. Note also that the default exporter defined here - * will disable distributed garbage collection (DGC) for the - * server endpoint associated with the exported listener, - * and the listener backend (the "impl") will be strongly - * referenced. This means that the listener will not "go away" - * unintentionally. Additionally, that exporter also sets the - * <code>keepAlive</code> flag to <code>false</code> to allow - * the VM in which this utility runs to "go away" when - * desired; and not be kept alive simply because the listener - * is still exported. - * </table> - * - * <a name="leaseManager"> - * <table summary="Describes the leaseManager configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>leaseManager</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link net.jini.lease.LeaseRenewalManager} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code> new - * {@link net.jini.lease.LeaseRenewalManager#LeaseRenewalManager( - * net.jini.config.Configuration) LeaseRenewalManager}(config)</code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> The object used to manage any event leases returned - * to a cache that has registered with the event mechanism - * of the various discovered lookup services. This entry will - * be retrieved from the configuration only if no lease - * renewal manager is specified in the constructor. This item - * is used only by the caches (both internal and external) - * that are created by this utility, and not by the utility - * itself. - * </table> - * - * <a name="registrarPreparer"> - * <table summary="Describes the registrarPreparer configuration entry" - * border="0" cellpadding="2"> - * <tr valign="top"> - * <th scope="col" summary="layout"> <font size="+1">•</font> - * <th scope="col" align="left" colspan="2"> <font size="+1"> - * <code>registrarPreparer</code></font> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link net.jini.security.ProxyPreparer} - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>new {@link net.jini.security.BasicProxyPreparer}() - * </code> - * - * <tr valign="top"> <td>   <th scope="row" align="right"> - * Description: - * <td> Preparer for the proxies to the lookup services that are - * discovered and used by this utility. This item is used only - * by the service discovery manager, and not by any cache that - * is created. - * <p> - * The following methods of the proxy returned by this preparer are - * invoked by this utility: - * <ul> - * <li>{@link net.jini.core.lookup.ServiceRegistrar#lookup lookup} - * <li>{@link net.jini.core.lookup.ServiceRegistrar#notify notify} - * </ul> - * - * </table> - * - * <a name="sdmLogging"> - * <p> - * <b><font size="+1">Logging</font></b> - * <p> - * </a> - * - * This implementation of <code>ServiceDiscoveryManager</code> uses the - * {@link Logger} named <code>net.jini.lookup.ServiceDiscoveryManager</code> - * to log information at the following logging levels: <p> - * - * <table border="1" cellpadding="5" - * summary="Describes the information logged by ServiceDiscoveryManager, - * and the levels at which that information is logged"> - * - * - * <caption halign="center" valign="top"> - * <b><code>net.jini.lookup.ServiceDiscoveryManager</code></b> - * </caption> - * - * <tr> <th scope="col"> Level</th> - * <th scope="col"> Description</th> - * </tr> - * - * <tr> - * <td>{@link java.util.logging.Level#INFO INFO}</td> - * <td> - * when any exception occurs while querying a lookup service, or upon - * applying a filter to the results of such a query - * </td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#INFO INFO}</td> - * <td> - * when any exception occurs while attempting to register with the event - * mechanism of a lookup service, or while attempting to prepare the lease - * on the registration with that event mechanism - * </td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#INFO INFO}</td> - * <td>when any exception occurs while attempting to prepare a proxy</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#INFO INFO}</td> - * <td> - * when an <code>IllegalStateException</code> occurs while discarding - * a lookup service proxy after logging a failure that has occurred in - * one of the tasks executed by this utility - * </td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#INFO INFO}</td> - * <td>upon failure of the lease renewal process</td> - * </tr> - * <tr> - * <td>{@link com.sun.jini.logging.Levels#HANDLED HANDLED}</td> - * <td> - * when an exception occurs because a remote call to a lookup service - * has been interrupted as a result of the termination of a cache - * </td> - * </tr> - * <tr> - * <td>{@link com.sun.jini.logging.Levels#HANDLED HANDLED}</td> - * <td> - * when a "gap" is encountered in an event sequence from a lookup service - * </td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINER FINER}</td> - * <td>upon failure of the lease cancellation process</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td>whenever any task is started</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td>whenever any task completes successfully</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td>whenever a lookup cache is created</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td>whenever a lookup cache is terminated</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td>whenever a proxy is prepared</td> - * </tr> - * <tr> - * <td>{@link java.util.logging.Level#FINEST FINEST}</td> - * <td> - * when an exception (that is, <code>IllegalStateException</code>) - * occurs while unexporting a cache's remote event listener while the - * cache is being terminated - * </td> - * </tr> - * </table> - * <p> - * See the {@link com.sun.jini.logging.LogManager} class for one way to use - * the logging level {@link com.sun.jini.logging.Levels#HANDLED HANDLED} in - * standard logging configuration files. - * <p> - * - * @author Sun Microsystems, Inc. - * - * @see net.jini.discovery.DiscoveryManagement - * @see net.jini.lookup.LookupCache - * @see net.jini.lookup.ServiceDiscoveryListener - * @see net.jini.lookup.ServiceDiscoveryEvent - * @see net.jini.core.lookup.ServiceRegistrar - */ -public class ServiceDiscoveryManager { - - /** Class for implementing register/lookup/notify/dropProxy/discard tasks*/ - private static abstract class CacheTask implements Runnable { - protected final ProxyReg reg; - protected volatile long thisTaskSeqN; - public CacheTask(ProxyReg reg, long seqN) { - this.reg = reg; - this.thisTaskSeqN = seqN; - }//end constructor - /* check if the task is on a specific ProxyReg, return true if it is */ - public boolean isFromProxy(ProxyReg reg) { - if(this.reg == null) return false; - return (this.reg).equals(reg); - }//end isFromProxy - - /** Returns the ProxyReg associated with this task (if any). */ - public ProxyReg getProxyReg() { - return reg; - }//end ProxyReg - - /** Returns the unique sequence number of this task. */ - public long getSeqN() { - return thisTaskSeqN; - }//end getSeqN - - public abstract void run(); - - public abstract boolean hasDeps(); - - public boolean dependsOn(CacheTask task){ - return false; - } - }//end class ServiceDiscoveryManager.CacheTask - - /** Abstract base class for controlling the order-of-execution of tasks - * corresponding to a particular serviceID associated with a particular - * lookup service. - */ - private static abstract class ServiceIdTask extends CacheTask { - protected final ServiceID thisTaskSid; - ServiceIdTask(ServiceID srvcId, ProxyReg reg, long seqN) { - super(reg, seqN); - this.thisTaskSid = srvcId; - }//end constructor - - /** Returns true if the current instance of this task must be run - * after at least one task in task manager queue. - * - * The criteria for determining what value to return is as follows: - * - * If there is at least one task in the given task list that is - * associated with the same serviceID as this task, and that task - * has a sequence number less than the sequence number of this task, - * then run this task *after* the task in the list (return true); - * otherwise run this task now (return false). - * - */ - @Override - public boolean dependsOn(CacheTask t) { - //Compare only instances of this task class - if( !(t instanceof ServiceIdTask) ) return false; - ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID(); - if( thisTaskSid.equals(otherTaskSid) ) { - if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) { - return true;//run this task after the other task - }//endif - }//endif - return false;//run this task now - }//end dependsOn - - @Override - public boolean hasDeps(){ - return true; - } - - /** Returns the ServiceID associated with this task. */ - public ServiceID getServiceID() { - return thisTaskSid; - }//end getServiceID - - }//end class ServiceIdTask - - /** Class that defines the listener that will receive local events from - * the internal LookupCache used in the blocking versions of lookup(). - */ - private final static class ServiceDiscoveryListenerImpl - implements ServiceDiscoveryListener - { - private final List items = new LinkedList(); - - @Override - public synchronized void serviceAdded(ServiceDiscoveryEvent event) { - items.add(event.getPostEventServiceItem()); - this.notifyAll(); - } - @Override - public void serviceRemoved(ServiceDiscoveryEvent event){ } - @Override - public void serviceChanged(ServiceDiscoveryEvent event){ } - public synchronized ServiceItem[] getServiceItem() { - ServiceItem[] r = new ServiceItem[items.size()]; - items.toArray(r); - items.clear(); - return r; - } - }//end class ServiceDiscoveryManager.ServiceDiscoveryListenerImpl - - /** - * Data structure used to group together the lease and event sequence - * number. For each LookupCache, there is a HashMap that maps a ProxyReg - * to an EventReg. - */ - private final static class EventReg { - /* The Event source from the event registration */ - final Object source; - /* The Event ID */ - public final long eventID; - /* The current event sequence number for the Service template */ - private long seqNo; - /* The Event notification lease */ - public final Lease lease; - /* The pending events */ - private final List<LookupCacheImpl.NotifyEventTask> pending; - /* The number of pending LookupTasks */ - private int lookupsPending; - - public EventReg(Object source, long eventID, long seqNo, Lease lease) { - this.source = source; - this.eventID = eventID; - this.seqNo = seqNo; - this.lease = lease; - pending = new ArrayList<LookupCacheImpl.NotifyEventTask>(); - this.lookupsPending = 0; - } - - /** - * @param seqNo the seqNo to set, return a positive delta if - * successful, otherwise a negative value indicates failure. - */ - public synchronized long updateSeqNo(long seqNo) { - long thisSeqNo = this.seqNo; - if (seqNo > this.seqNo){ - this.seqNo = seqNo; - return thisSeqNo - seqNo; - } else { - return thisSeqNo - seqNo; // Return a negative value - } - } - - public synchronized boolean addIfLookupsPending(CacheTask task){ - if (task instanceof LookupCacheImpl.NotifyEventTask){ - if (getLookupsPending() > 0){ - pending.add((LookupCacheImpl.NotifyEventTask) task); - } - } - return false; - } -// -// public synchronized void addLookupPending(ObservableFutureTask task){ -// task.addObserver(this); -// pendingLookups.add(task); -// } - - /** - * @return the lookupsPending - */ - public synchronized int getLookupsPending() { - return lookupsPending; - } - - public synchronized void decrementLookupsPending(){ - if (lookupsPending > 0) { - lookupsPending--; - } - } - - public synchronized void incrementLookupsPending(){ - lookupsPending++; - } - }//end class ServiceDiscoveryManager.EventReg - - /** - * Used in the LookupCache. For each LookupCache, there is a HashMap that - * maps ServiceId to a ServiceItemReg. The ServiceItemReg class helps - * track where the ServiceItem comes from. - */ - private final static class ServiceItemReg { - /* Maps ServiceRegistrars to their latest registered item */ - private final Map<ServiceRegistrar,ServiceItem> items; - /* The ServiceRegistrar currently being used to track changes */ - private ServiceRegistrar proxy; - /* Flag that indicates that the ServiceItem has been discarded. */ - private final boolean bDiscarded; - /* The discovered service, prior to filtering. */ - public final ServiceItem item; - /* The discovered service, after filtering. */ - public final ServiceItem filteredItem; - /* Creates an instance of this class, and associates it with the given - * lookup service proxy. - */ - public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) { - this.bDiscarded = false; - items = new HashMap<ServiceRegistrar,ServiceItem>(); - this.proxy = proxy; - items.put(proxy, item); - this.item = item; - filteredItem = null; - } - - public ServiceItemReg(ServiceItemReg reg, ServiceRegistrar proxy, ServiceItem item, ServiceItem filteredItem, boolean discarded){ - items = reg.items; - this.proxy = proxy != null? proxy : reg.proxy; - this.item = item; - this.filteredItem = filteredItem; - bDiscarded = discarded; - } - /* Adds the given proxy to the 'proxy-to-item' map. This method is - * called from this class' constructor, LookupTask, NotifyEventTask, - * and ProxyRegDropTask. Returns false if the proxy is being used - * to track changes, true otherwise. - */ - public boolean proxyNotUsedToTrackChange(ServiceRegistrar proxy, ServiceItem item) { - synchronized (items){ - items.put(proxy, item); - return !proxy.equals(this.proxy); - } - } - /* Removes the given proxy from the 'proxy-to-item' map. This method - * is called from NotifyEventTask and ProxyRegDropTask. If this proxy - * was being used to track changes, then pick a new one and return - * its current item, else return null. - */ - public ServiceItem removeProxy(ServiceRegistrar proxy) { - synchronized (items){ - items.remove(proxy); - if (proxy.equals(this.proxy)) { - if (items.isEmpty()) { - this.proxy = null; - } else { - Map.Entry ent = (Map.Entry) items.entrySet().iterator().next(); - this.proxy = (ServiceRegistrar) ent.getKey(); - return (ServiceItem) ent.getValue(); - }//endif - }//endif - } - return null; - } - /* Determines if the 'proxy-to-item' map contains any mappings. - * This method is called from NotifyEventTask and ProxyRegDropTask. - */ - public boolean hasNoProxys() { - synchronized (items){ - return items.isEmpty(); - } - } - /* Returns the flag indicating whether the ServiceItem is discarded. */ - public boolean isDiscarded() { - return bDiscarded; - } - - /** - * @return the proxy - */ - public ServiceRegistrar getProxy() { - synchronized (items){ - return proxy; - } - } - } - /** A wrapper class for a ServiceRegistrar. */ - private final static class ProxyReg { - private final ServiceRegistrar proxy; - private final int hash; - public ProxyReg(ServiceRegistrar proxy) { - if(proxy == null) throw new IllegalArgumentException - ("proxy cannot be null"); - this.proxy = proxy; - hash = proxy.hashCode(); - }//end constructor - - @Override - - public boolean equals(Object obj) { - if (obj instanceof ProxyReg){ - return getProxy().equals(((ProxyReg)obj).getProxy()); - } else return false; - }//end equals - - @Override - public int hashCode() { - return hash; - }//end hashCode - - /** - * @return the proxy - */ - public ServiceRegistrar getProxy() { - return proxy; - } - - }//end class ServiceDiscoveryManager.ProxyReg - - /** The Listener class for the LeaseRenewalManager. */ - private final class LeaseListenerImpl implements LeaseListener { - private final ServiceRegistrar proxy; - public LeaseListenerImpl(ServiceRegistrar proxy) { - this.proxy = proxy; - } - /* When lease renewal fails, we discard the proxy */ - public void notify(LeaseRenewalEvent e) { - fail(e.getException(),proxy, this.getClass().getName(), "notify", - "failure occurred while renewing an event lease", false); - } - }//end class ServiceDiscoveryManager.LeaseListenerImpl - - /** Allows termination of LookupCacheImpl so blocking lookup can return - * quickly - */ - private static final class LookupCacheTerminator implements Runnable { - private final BlockingQueue<LookupCacheImpl> cacheList = new LinkedBlockingQueue<LookupCacheImpl>(20); - - - public void run(){ - while (!Thread.currentThread().isInterrupted()) { - try { - LookupCacheImpl cache = cacheList.take(); - synchronized (cache){ - cache.terminate(); - } - } catch (InterruptedException ex) { - logger.log(Level.FINE, "SDM lookup cache terminator interrupted", ex); - break; - } - } - } - - void terminate(LookupCacheImpl cache){ - boolean added = cacheList.offer(cache); - if (!added) { // happens if cacheList is full. - // Do it yourself you lazy caller thread! Can't you see I'm busy? - synchronized (cache){ - cache.terminate(); - } - } - } - - } - - /** - * - * @param <T> - */ - static final class CacheTaskWrapper<T> extends ObservableFutureTask<T> - { - private final CacheTask task; - - private CacheTaskWrapper(Runnable r, T result) { - super(r, result); - if (r instanceof CacheTask) task = (CacheTask)r; - else task = null; - } - - private CacheTaskWrapper(Callable<T> c) - { - super(c); - if (c instanceof CacheTask) task = (CacheTask)c; - else task = null; - } - - private CacheTask getTask(){ - return task; - } - } - - private static final class CacheTaskQueue implements FutureObserver { - // CacheTasks pending completion. - private final ConcurrentLinkedQueue<CacheTaskWrapper> pending; - private final ExecutorService executor; - - private CacheTaskQueue(ExecutorService e){ - this.pending = new ConcurrentLinkedQueue<CacheTaskWrapper>(); - executor = e; - } - - private CacheTaskWrapper submit(CacheTask t){ - CacheTaskWrapper future = new CacheTaskWrapper(t, null); - pending.offer(future); - future.addObserver(this); - try { - Thread.sleep(100L); //Brief sleep to allow deps to find each other. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); // restore. - } - if (t.hasDeps()) { - List<ObservableFuture> deps = new ArrayList<ObservableFuture>(); - Iterator<CacheTaskWrapper> it = pending.iterator(); - while (it.hasNext()){ - CacheTaskWrapper w = it.next(); - CacheTask c = w.getTask(); - if (t.dependsOn(c)) { - deps.add(w); - } - } - if (deps.isEmpty()){ - executor.submit(future); - } else { - DependencyLinker linker = new DependencyLinker(executor, deps, future); - linker.register(); - } - } else { - executor.submit(future); - } - return future; - } - - @Override - public void futureCompleted(Future e) { - pending.remove(e); - } - - /** Removes from the cache's task queue, all pending tasks - * associated with the given ProxyReg. This method is called - * when the given ProxyReg has been discarded. - */ - private void removeUselessTask(ProxyReg reg) { - Iterator<CacheTaskWrapper> it = pending.iterator(); - while( it.hasNext()) { - CacheTaskWrapper w = it.next(); - CacheTask t = w.getTask(); - if(t.isFromProxy(reg)) { - w.cancel(true); // Also causes task to be removed - } - }//end loop - }//end LookupCacheImpl.removeUselessTask - } - - private static class RemoteEventComparator implements Comparator<RemoteEvent>{ - - @Override - public int compare(RemoteEvent o1, RemoteEvent o2) { - boolean Null1 = o1 == null; - boolean Null2 = o2 == null; - if (Null1 && Null2) return 0; - if (Null1 && !Null2) return -1; - if (!Null1 && Null2) return 1; - long eventID1 = o1.getID(); - long eventID2 = o2.getID(); - if (eventID1 < eventID2) return -1; - if (eventID1 > eventID2) return 1; - long seq1 = o1.getSequenceNumber(); - long seq2 = o2.getSequenceNumber(); - if (seq1 < seq2) return -1; - if (seq1 > seq2) return 1; - return 0; - } - - } - - /** Internal implementation of the LookupCache interface. Instances of - * this class are used in the blocking versions of lookup() and are - * returned by createLookupCache. - */ - private static final class LookupCacheImpl implements LookupCache { - - /* RemoteEventListener class that is registered with the proxy to - * receive notifications from lookup services when any ServiceItem - * changes (NOMATCH_MATCH, MATCH_NOMATCH, MATCH_MATCH) - */ - private final class LookupListener implements RemoteEventListener, - ServerProxyTrust - { - /* Buffer to avoid unnecessary lookup task calls, used for - * reordering out of order events. */ - private final BlockingQueue<RemoteEvent> eventQueue; - - public LookupListener() { - this.eventQueue = - new PriorityBlockingQueue<RemoteEvent>( - 11, - new RemoteEventComparator() - ); - }//end constructor - - // Moved export from constructor and LookupCacheImpl constructor - // to avoid publishing partially constructed object. - RemoteEventListener export() throws ExportException { - return (RemoteEventListener)lookupListenerExporter.export(this); - } - - @Override - public void notify(RemoteEvent evt) throws UnknownEventException, - java.rmi.RemoteException { - if (!(evt instanceof ServiceEvent)) throw - new UnknownEventException("ServiceEvent required,not: " - + evt.toString()); - eventQueue.offer(evt); - ServiceEvent theEvent = null; - try { - Thread.sleep(200L); - theEvent = (ServiceEvent) eventQueue.poll(); - } catch (InterruptedException ex) { - // Restore Interrupt. - Thread.currentThread().interrupt(); - if (theEvent == null) throw new RemoteException( - "RemoteEventListener interrupted while processing event" - ); - } - notifyServiceMap( theEvent.getSource(), - theEvent.getID(), - theEvent.getSequenceNumber(), - theEvent.getServiceID(), - theEvent.getServiceItem(), - theEvent.getTransition() ); - } - /** Returns a <code>TrustVerifier</code> which can be used to - * verify that a given proxy to this listener can be trusted. - */ - public TrustVerifier getProxyVerifier() { - return new BasicProxyTrustVerifier(lookupListenerProxy); - }//end getProxyVerifier - }//end class LookupCacheImpl.LookupListener - - /** This task class, when executed, first registers to receive - * ServiceEvents from the given ServiceRegistrar. If the registration - * process succeeds (no RemoteExceptions), it then executes the - * LookupTask to query the given ServiceRegistrar for a "snapshot" - * of its current state with respect to services that match the - * given template. - * - * Note that the order of execution of the two tasks is important. - * That is, the LookupTask must be executed only after registration - * for events has completed. This is because when an entity registers - * with the event mechanism of a ServiceRegistrar, the entity will - * only receive notification of events that occur "in the future", - * after the registration is made. The entity will not receive events - * about changes to the state of the ServiceRegistrar that may have - * occurred before or during the registration process. - * - * Thus, if the order of these tasks were reversed and the LookupTask - * were to be executed prior to the RegisterListenerTask, then the - * possibility exists for the occurrence of a change in the - * ServiceRegistrar's state between the time the LookupTask retrieves - * a snapshot of that state, and the time the event registration - * process has completed, resulting in an incorrect view of the - * current state of the ServiceRegistrar. - */ - private static final class RegisterListenerTask extends CacheTask - implements Subscribeable { - private final LookupCacheImpl cache; - private Subscriber subscriber; - public RegisterListenerTask( - ProxyReg reg, - long seqN, - LookupCacheImpl cache) - { - super(reg, seqN); - this.cache = cache; - } - - public boolean hasDeps(){ - return true; - } - - public boolean dependsOn(CacheTask t){ - if (t instanceof ProxyRegDropTask ){ - ProxyReg r = getProxyReg(); - if (r != null && r.equals(t.getProxyReg())){ - if (t.getSeqN() < getSeqN()) return true; - } - } - return false; - } - - public void run() { - logger.finest("ServiceDiscoveryManager - RegisterListenerTask " - +"started"); - long duration = cache.getLeaseDuration(); - if(duration < 0) return; - try { - EventReg eventReg = cache.sdm.registerListener(reg.getProxy(), - cache.tmpl, - cache.lookupListenerProxy, - duration); - // eventReg is a new object not visible to other threads yet. - eventReg.incrementLookupsPending(); - /* Cancel the lease if the cache has been terminated */ - if(cache.bCacheTerminated || Thread.currentThread().isInterrupted()) { - // eventReg.lease is final and is already safely published - cache.sdm.cancelLease(eventReg.lease); - return; - } else { - // eventReg will be published safely to other threads - // via eventRegMap and LookupTask - EventReg existed = cache.eventRegMap.putIfAbsent(reg, eventReg); - if (existed == null ) { - ObservableFuture task = - cache.taskQueue.submit( - new LookupTask(reg, this.getSeqN(), eventReg, cache) - ); - synchronized (this){ - if (subscriber != null) subscriber.reccommendedViewing(task); - } - } else { - // Another eventReg.lease exists, cancel new lease. - cache.sdm.cancelLease(eventReg.lease); - } - }//endif - /* Execute the LookupTask only if there were no problems */ - - } catch (Exception e) { - cache.sdm.fail - (e, reg.getProxy(),this.getClass().getName(),"run", - "Exception occurred while attempting to register " - +"with the lookup service event mechanism", - cache.bCacheTerminated); - } - logger.finest("ServiceDiscoveryManager - RegisterListenerTask " - +"completed"); - }//end run - - @Override - public synchronized void subscribe(Subscriber subscriber) { - this.subscriber = subscriber; - } - }//end class LookupCacheImpl.RegisterListenerTask - - /** This class requests a "snapshot" of the given registrar's state.*/ - private static final class LookupTask extends CacheTask implements Subscribeable { - private final EventReg eReg; - private final LookupCacheImpl cache; - private Subscriber subscriber; - public LookupTask(ProxyReg reg, long seqN, EventReg eReg, LookupCacheImpl cache) { - super(reg, seqN); - this.eReg = eReg; - this.cache = cache; - } - public void run() { - logger.finest("ServiceDiscoveryManager - LookupTask started"); - ServiceRegistrar proxy = reg.getProxy(); - ServiceMatches matches; - /* For the given lookup, get all services matching the tmpl */ - try { - matches = proxy.lookup(cache.tmpl, Integer.MAX_VALUE); - } catch (Exception e) { - // ReRegisterGoodEquals test failure becomes more predictable - // when fail is only called if decrement is successful. - eReg.decrementLookupsPending(); - cache.sdm.fail - (e,proxy,this.getClass().getName(),"run", - "Exception occurred during call to lookup", - cache.bCacheTerminated); - return; - } - if(matches.items == null) { - throw new AssertionError("spec violation in queried " - +"lookup service: ServicesMatches instance " - +"returned by call to lookup() method contains " - +"null 'items' field"); - } - /* 1. Cleanup "orphaned" itemReg's. */ - Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter - = cache.serviceIdMap.entrySet().iterator(); - while(iter.hasNext()) { - Map.Entry<ServiceID,ServiceItemReg> e = iter.next(); - ServiceID srvcID = e.getKey(); - ServiceItem itemInSnapshot = cache.findItem(srvcID, - matches.items); - if(itemInSnapshot != null) continue;//not an orphan - if (Thread.currentThread().isInterrupted()) continue; // skip - ServiceItemReg itemReg = e.getValue(); - CacheTask t = - new UnmapProxyTask( - reg, - itemReg, - srvcID, - getSeqN(), - cache - ); - ObservableFuture task = cache.taskQueue.submit(t); - synchronized (this){ - if (subscriber != null) subscriber.reccommendedViewing(task); - } - }//end loop - /* 2. Handle "new" and "old" items from the given lookup */ - int l = (matches.items).length; - for(int i=0; i<l; i++) { - /* Skip items with null service field (Bug 4378751) */ - if( (matches.items[i]).service == null ) continue; -// if (Thread.currentThread().isInterrupted()) continue; // skip - CacheTask t = - new NewOldServiceTask( - reg, - matches.items[i], - false, - getSeqN(), - cache - ); - - ObservableFuture task = cache.taskQueue.submit(t); - synchronized (this){ - if (subscriber != null) subscriber.reccommendedViewing(task); - } - }//end loop - /* 3. Handle events that came in prior to lookup */ - synchronized (eReg){ - eReg.decrementLookupsPending(); - if (eReg.getLookupsPending() == 0){ - Iterator it = eReg.pending.iterator() ; - while (it.hasNext()) { - if (Thread.currentThread().isInterrupted()) continue; // skip - NotifyEventTask t = (NotifyEventTask) it.next(); - t.thisTaskSeqN = cache.taskSeqN.getAndIncrement(); // assign new seqN - cache.taskQueue.submit(t); - } - eReg.pending.clear(); - } - }//end sync(eReg) - logger.finest("ServiceDiscoveryManager - LookupTask " - +"completed"); - }//end run - - /** Returns true if the current instance of this task must be run - * after at least one task in the task manager queue. - * - * The criteria for determining what value to return: - * - * If the task list contains any RegisterListenerTasks, - * other LookupTasks, or NotifyEventTasks associated with - * this task's lookup service (ProxyReg), if those tasks - * were queued prior to this task (have lower sequence - * numbers), then run those tasks before this task (return - * true). Otherwise this task can be run immediately - * (return false). - * - * This method was added to address Bug ID 6291851. - * - * @param tasks the tasks to consider. - * @param size elements with index less than size are considered. - */ - public boolean dependsOn(CacheTask t) { - if( t instanceof RegisterListenerTask - || t instanceof LookupTask - || t instanceof NotifyEventTask ) - { - ProxyReg otherReg = t.getProxyReg(); - if( reg.equals(otherReg) ) { - if(thisTaskSeqN > t.getSeqN()) return true; - }//endif - }//endif - return false; - }//end dependsOn - - public boolean hasDeps(){ - return true; - } - - @Override - public synchronized void subscribe(Subscriber subscriber) { - this.subscriber = subscriber; - } - - }//end class LookupCacheImpl.LookupTask - - /** When the given registrar is discarded, this Task class is used to - * remove the registrar from the various maps maintained by this - * cache. - */ - private static final class ProxyRegDropTask extends CacheTask - implements Subscribeable { - - private final LookupCacheImpl cache; - private final EventReg eReg; - private Subscriber subscriber; - - public ProxyRegDropTask( - ProxyReg reg, EventReg eReg, long seqN, LookupCacheImpl cache) - { - super(reg, seqN); - this.cache = cache; - this.eReg = eReg; - } - public void run() { - logger.finest("ServiceDiscoveryManager - ProxyRegDropTask " - +"started"); - //lease has already been cancelled by removeProxyReg - cache.eventRegMap.remove(reg, eReg); - /* For each itemReg in the serviceIdMap, disassociate the - * lookup service referenced here from the itemReg; and - * if the itemReg then has no more lookup services associated - * with it, remove the itemReg from the map and send a - * service removed event. - */ - Iterator iter = (cache.serviceIdMap.entrySet()).iterator(); - while(iter.hasNext()) { - Map.Entry e = (Map.Entry)iter.next(); - ServiceID srvcID = (ServiceID)e.getKey(); - ServiceItemReg itemReg = (ServiceItemReg)e.getValue(); - UnmapProxyTask t = - new UnmapProxyTask( - reg, - itemReg, - srvcID, - getSeqN(), - cache); - ObservableFuture task = cache.taskQueue.submit(t); - synchronized (this){ - if (subscriber != null) subscriber.reccommendedViewing(task); - } - }//end loop - logger.finest("ServiceDiscoveryManager - ProxyRegDropTask " - +"completed"); - }//end run - - @Override - public boolean hasDeps(){ - return true; - } - - public boolean dependsOn(CacheTask t){ - if (t instanceof RegisterListenerTask ){ - ProxyReg r = getProxyReg(); - if (r != null && r.equals(t.getProxyReg())){ - if (t.getSeqN() < getSeqN()) return true; - } - } - return false; - } - - @Override - public synchronized void subscribe(Subscriber subscriber) { - this.subscriber = subscriber; - } - - }//end class LookupCacheImpl.ProxyRegDropTask - - /** Task class used to asynchronously notify service discard. */ - private final class DiscardServiceTask extends CacheTask { - private final ServiceItem item; - public DiscardServiceTask(ServiceItem item) { - super(null, 0); - this.item = item; - } - - public boolean hasDeps(){ - return false; - } - - public void run() { - logger.finest("ServiceDiscoveryManager - DiscardServiceTask " - +"started"); - removeServiceNotify(item); - logger.finest("ServiceDiscoveryManager - DiscardServiceTask " - +"completed"); - }//end run - }//end class LookupCacheImpl.DiscardServiceTask - - /** Task class used to asynchronously notify all registered service - * discovery listeners of serviceAdded/serviceRemoved/serviceChanged - * events. - */ - private static class NotifyEventTask extends ServiceIdTask - implements Subscribeable { - private final ServiceID sid; - private final ServiceItem item; - private final int transition; - private final LookupCacheImpl cache; - private Subscriber subscriber; - public NotifyEventTask(LookupCacheImpl cache, - ProxyReg reg, - ServiceID sid, - ServiceItem item, - int transition, - long seqN) - { - super(sid, reg, seqN); - this.sid = sid; - this.item = item; - this.transition = transition; - this.cache = cache; - }//end constructor - - public void run() { - logger.finest("ServiceDiscoveryManager - NotifyEventTask " - +"started"); - /* Fix for Bug ID 4378751. The conditions described by that - * bug involve a ServiceItem (corresponding to a previously - * discovered service ID) having a null service field. A - * null service field is due to an UnmarshalException caused - * by a SecurityException that results from the lack of a - * connection permission for the lookup service codebase - * to the service's remote codebase. Skip this ServiceItem, - * otherwise an un-expected serviceRemoved event will result - * because the primary if-block will be unintentionally - * entered due to the null service field in the ServiceItem. - */ - if( ((item != null) && (item.service == null))) return; - - /* Handle the event by the transition type, and by whether - * the associated ServiceItem is an old, previously discovered - * item, or a newly discovered item. - */ - if(transition == ServiceRegistrar.TRANSITION_MATCH_NOMATCH) { - cache.handleMatchNoMatch(reg.getProxy(), sid); - } else {//(transition == NOMATCH_MATCH or MATCH_MATCH) - ObservableFuture task = cache.taskQueue.submit( - new NewOldServiceTask(reg, item, - (transition == ServiceRegistrar.TRANSITION_MATCH_MATCH), - thisTaskSeqN, cache)); - synchronized (this){ - if (subscriber != null) subscriber.reccommendedViewing(task); - } - }//endif(transition) - logger.finest("ServiceDiscoveryManager - NotifyEventTask " - +"completed"); - }//end run - - /** Returns true if the current instance of this task must be run - * after at least one task in the task manager queue. - * - * The criteria for determining what value to return: - * - * If the task list contains any RegisterListenerTasks - * or LookupTasks associated with this task's lookup service - * (ProxyReg), and if those tasks were queued prior to this - * task (have lower sequence numbers), then run those tasks - * before this task (return true). - * - * Additionally, if the task list contains any other - * ServiceIdTasks associated with this task's service ID - * which were queued prior to this task, then run those - * tasks before this task. - * - * If the criteria outlined above is not satisfied, then this - * task can be run immediately (return false). - * - * This method was added to address Bug ID 6291851. - * - * @param tasks the tasks to consider. - * @param size elements with index less than size are considered. - */ - public boolean dependsOn(CacheTask t) { - if( t instanceof RegisterListenerTask - || t instanceof LookupTask ) - { - ProxyReg otherReg = t.getProxyReg(); - if( reg.equals(otherReg) ) { - if(thisTaskSeqN > t.getSeqN()) return true; - }//endif - }//endif - return super.dependsOn(t); - }//end runAfter - - public boolean hasDeps(){ - return true; - } - - @Override - public synchronized void subscribe(Subscriber subscriber) { - this.subscriber = subscriber; - } - - }//end class LookupCacheImpl.NotifyEventTask - - /** Task class used to determine whether or not to "commit" a service - * discard request, increasing the chances that the service will - * eventually be re-discovered. This task is also used to attempt - * a filter retry on an item in which the cache's filter initially - * returned indefinite. - */ - private static final class ServiceDiscardTimerTask implements Runnable - { - private final ServiceID serviceID; - private final long endTime; - private final LookupCacheImpl cache; - public ServiceDiscardTimerTask( LookupCacheImpl cache, ServiceID serviceID) { - this.serviceID = serviceID; - this.cache = cache; - this.endTime = cache.sdm.discardWait + System.currentTimeMillis(); - }//end constructor - public void run(){ - logger.finest("ServiceDiscoveryManager - " - +"ServiceDiscardTimerTask started"); - /* Exit if this cache has already been terminated. */ - if(cache.bCacheTerminated) return; - /* Simply return if a MATCH_NOMATCH event arrived for this - * item prior to this task running and as a result, the item - * was removed from the map. - */ - if(!cache.serviceIdMap.containsKey(serviceID)) return; - long curDur = endTime-System.currentTimeMillis(); - synchronized(cache.serviceDiscardMutex) { - /* Wait until interrupted or time expires */ - while(curDur > 0) { - try { - cache.serviceDiscardMutex.wait(curDur); - } catch(InterruptedException e){ } - /* Exit if this cache was terminated while waiting. */ - if(cache.bCacheTerminated) return; - /* Either the wait period has completed or has been - * interrupted. If the service ID is no longer in - * in the serviceIdMap, then it's assumed that a - * MATCH_NOMATCH event must have arrived which could be - * viewed as an indication that the service's lease - * expired, which then could be interpreted as meaning - * the service is actually down, and will be - * re-discovered when it comes back on line. In that - * case, exit the thread. - */ - if(!cache.serviceIdMap.containsKey(serviceID)) return; - curDur = endTime-System.currentTimeMillis(); - }//end loop - }//end sync - cache.taskQueue.submit(new ServiceDiscardRetryFilterTask(cache, serviceID, null, cache.taskSeqN.incrementAndGet())); - }//end run - }//end class LookupCacheImpl.ServiceDiscardTimerTask - - private static class ServiceDiscardRetryFilterTask extends ServiceIdTask { - private final LookupCacheImpl cache; - ServiceDiscardRetryFilterTask(LookupCacheImpl cache, ServiceID srvcId, ProxyReg reg, long seqN) { - super(srvcId, reg, seqN); - this.cache = cache; - } - - @Override - public void run() { - /* The thread was not interrupted, time expired. - * - * If the service ID is still contained in the serviceIdMap, - * then a MATCH_NOMATCH event did not arrive, which is - * interpreted here to mean that the service is still up. - * The service ID will still be in the map if one (or both) - * of the following is true: - * - the client discarded an unreachable service that never - * actually went down (so it's lease never expired, and - * a MATCH_NOMATCH event was never received) - * - upon applying the cache's filter to the service, the - * filter returned indefinite, and this task was queued - * to request that filtering be retried at a later time - * - * For the first case above, the service is "un-discarded" so - * the service will be available to the client again. For the - * second case, the filter is retried. If the service passes - * the filter, the service is "un-discarded"; otherwise, it is - * 'quietly' removed from the map (because a service removed - * event was already sent when the service was originally - * discarded. - */ - ServiceItemReg itemReg = cache.serviceIdMap.get(thisTaskSid); - if(itemReg != null) { - if(!itemReg.isDiscarded()) return; - ServiceItem item; - ServiceItem filteredItem = null; - ServiceItem itemToSend = itemReg.filteredItem; - if(itemToSend == null) { - item = new ServiceItem - ( (itemReg.item).serviceID, - (itemReg.item).service, - (itemReg.item).attributeSets); - filteredItem = new ServiceItem - ( (itemReg.item).serviceID, - (itemReg.item).service, - (itemReg.item).attributeSets); - //retry the filter - if( cache.sdm.filterPassed(filteredItem,cache.filter) ) { - cache.addFilteredItemToMap(item,filteredItem); // "un-discards" - itemToSend = filteredItem; - } else {//'quietly' remove the item - cache.removeServiceIdMapSendNoEvent(thisTaskSid, itemReg); - return; - }//endif - }//endif - /* Either the filter was retried and passed, in which case, - * the filtered itemCopy was placed in the map and - * "un-discarded"; or the - * filter wasn't applied above (a non-null filteredItem - * field in the itemReg in the map means that the filter - * was applied at some previous time). In the latter case, the - * service can now be "un-discarded", and a notification - * that the service is now available can be sent for either case. - */ - if (filteredItem == null){ - boolean replaced = false; - while (!replaced){ - ServiceItemReg replacementItemReg = new ServiceItemReg(itemReg, itemReg.getProxy(), itemReg.item, itemReg.filteredItem, false); - replaced = cache.serviceIdMap.replace(thisTaskSid, itemReg, replacementItemReg); - if (replaced) break; - ServiceItemReg existed = cache.serviceIdMap.putIfAbsent(thisTaskSid, replacementItemReg); - if (existed != null){ - if (!existed.isDiscarded()) return; - itemReg = existed; - } else { - break; // Added - } - } - } - cache.addServiceNotify(itemToSend); - }//endif - logger.finest("ServiceDiscoveryManager - " - +"ServiceDiscardTimerTask completed"); - }//end run - }//end class LookupCacheImpl.ServiceDiscardTimerTask - - /** Task class used to asynchronously process the service state - * ("snapshot"), matching this cache's template, that was retrieved - * from the given lookup service. - * - * After retrieving the snapshot S, the LookupTask queues an instance - * of this task for each service referenced in S. This task determines - * if the given service is an already-discovered service (is currently - * in this cache's serviceIdMap), or is a new service. This task - * handles the service differently, depending on whether the service - * is a new or old. - * - * a. if the item is old, then this task will: - * - compare the given item from the snapshot to the UN-filtered - * item in given itemReg - * if(same version but attributes have changed) - * send changed event - * else if( version has changed ) - * send removed event followed by added event - * else - * do nothing - * - apply the filter to the given item - * if(filter fails) - * send removed event - * else if(filter passes) - * set the filtered item in the itemReg in the map - * else if (filter is indefinite) - * discard item - * send removed event - * queue another filter attempt for later - * b. if the given item is newly discovered, then this task will: - * - create a new ServiceItemReg containing the given item - * - place the new itemReg in the serviceIdMap - * - apply the filter to the given item - * if(filter fails) - * remove the item from the map but - * send NO removed event - * else if(filter passes) - * send added event for the FILTERED item - * else if (filter is indefinite) - * discard item - * queue another filter attempt for later but - * send NO removed event - */ - private static final class NewOldServiceTask extends ServiceIdTask { - private final ServiceItem srvcItem; - private final boolean matchMatchEvent; - private final LookupCacheImpl cache; - public NewOldServiceTask( - ProxyReg reg, - ServiceItem item, - boolean matchMatchEvent, - long seqN, - LookupCacheImpl cache) - { - super(item.serviceID, reg, seqN); - this.srvcItem = item; - this.matchMatchEvent = matchMatchEvent; - this.cache = cache; - }//end constructor - - public void run() { - logger.finest("ServiceDiscoveryManager - NewOldServiceTask " - +"started"); - boolean previouslyDiscovered = false; - - ServiceItemReg itemReg = null; - itemReg = cache.serviceIdMap.get(thisTaskSid); - if (itemReg == null) { - if( !cache.eventRegMap.containsKey(reg) ) { - /* reg must have been discarded, simply return */ - logger.finest("ServiceDiscoveryManager - " - +"NewOldServiceTask completed"); - return; - }//endif - // else - itemReg = new ServiceItemReg( reg.getProxy(), srvcItem ); - ServiceItemReg existed = cache.serviceIdMap.putIfAbsent(thisTaskSid, itemReg); - if (existed != null){ - itemReg = existed; - previouslyDiscovered = true; - } - } else { - previouslyDiscovered = true; - } - - if(previouslyDiscovered) {//a. old, previously discovered item - // If it didn't get replaced the added sync is reentrant, - // otherwise we sync again in case it did get replaced. - cache.itemMatchMatchChange(thisTaskSid, itemReg, - reg.getProxy(), srvcItem, matchMatchEvent); - } else {//b. newly discovered item - ServiceItem newFilteredItem = - cache.filterMaybeDiscard(thisTaskSid, itemReg, srvcItem, false); - if(newFilteredItem != null) { - cache.addServiceNotify(newFilteredItem); - }//endif - }//endif - logger.finest("ServiceDiscoveryManager - NewOldServiceTask " - +"completed"); - }//end run - }//end class LookupCacheImpl.NewOldServiceTask - - /** Task class used to asynchronously disassociate the given lookup - * service proxy from the given ServiceItemReg. This task is created - * and queued in both the LookupTask, and the ProxyRegDropTask. - * - * When the LookupTask determines that the service referenced by the - * given ServiceItemReg is an "orphan", the LookupTask queues an - * instance of this task. A service is an orphan if it is referenced - * in the serviceIdMap, but is no longer registered in any of the - * lookup service(s) to which it is mapped in the serviceIdMap. - * Note that the existence of orphans is possible when events from - * a particular lookup service are missed; that is, there is a "gap" - * in the event sequence numbers. - * - * When a previously discovered lookup service is discarded, the - * ProxyRegDropTask is initiated, and that task creates and queues - * an instance of this task for each mapping in this cache's - * serviceIdMap. - * - * This task removes the given lookup service proxy from the set - * associated with the service item referenced in the given - * ServiceItemReg, and determines whether that service is still - * associated with at least one lookup service. If the service is - * no longer associated with any other lookup service in the managed - * set of lookup services, the mapping that references the given - * ServiceItemReg is removed from the serviceIdMap, and a - * serviceRemoved event is sent. - * - * In this way, other tasks from this cache operating on the same - * service will not concurrently modify any state related to that - * service. - */ - private static final class UnmapProxyTask extends ServiceIdTask { - private final ServiceItemReg itemReg; - private final LookupCacheImpl cache; - public UnmapProxyTask(ProxyReg reg, ServiceItemReg itemReg, ServiceID srvcId, long seqN, LookupCacheImpl cache) - { - super(srvcId, reg, seqN); - this.itemReg = itemReg; - this.cache = cache; - }//end constructor - - public void run() { - logger.finest("ServiceDiscoveryManager - UnmapProxyTask " - +"started"); - ServiceRegistrar proxy = null; - ServiceItem item; - boolean notify = false; - item = itemReg.removeProxy(reg.getProxy());//disassociate the LUS - if (item != null) {// new LUS chosen to track changes - proxy = itemReg.getProxy(); - } else if( itemReg.hasNoProxys()) {//no more LUSs, remove from map - item = itemReg.filteredItem; - boolean removed = false; - removed = (cache.removeServiceIdMapSendNoEvent(thisTaskSid, itemReg)); - if (removed && !itemReg.isDiscarded()){ - notify = true; - } - }//endif - if(proxy != null) { - cache.itemMatchMatchChange(thisTaskSid, itemReg, proxy, item, false); - }//endif - if (notify) cache.removeServiceNotify(item); - logger.finest("ServiceDiscoveryManager - UnmapProxyTask " - +"completed"); - }//end run - }//end class LookupCacheImpl.UnmapProxyTask - - private final static int ITEM_ADDED = 0; - private final static int ITEM_REMOVED = 2; - private final static int ITEM_CHANGED = 3; - - /* The listener that receives remote events from the lookup services */ - private final LookupListener lookupListener; - /* Exporter for the remote event listener (lookupListener) */ - private volatile Exporter lookupListenerExporter; - /* Proxy to the listener that receives remote events from lookups */ - private volatile RemoteEventListener lookupListenerProxy; - /** Task manager for the various tasks executed by this LookupCache */ - private volatile ExecutorService cacheTaskMgr;
[... 6502 lines stripped ...]
