http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index 4ac2c9c..6a230e3 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.GemFireCache; import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest; @@ -60,6 +59,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpHandler; import org.apache.geode.distributed.internal.tcpserver.TcpServer; import org.apache.geode.internal.admin.remote.DistributionLocatorId; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.wan.WANServiceProvider; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.InternalLogWriter; @@ -74,7 +74,6 @@ import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.management.internal.JmxManagerLocator; import org.apache.geode.management.internal.JmxManagerLocatorRequest; -import org.apache.geode.management.internal.JmxManagerLocatorResponse; import org.apache.geode.management.internal.cli.CliUtil; import org.apache.geode.management.internal.configuration.domain.SharedConfigurationStatus; import org.apache.geode.management.internal.configuration.handlers.ConfigurationRequestHandler; @@ -84,7 +83,7 @@ import org.apache.geode.management.internal.configuration.messages.SharedConfigu import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse; /** - * Provides the implementation of a distribution <code>Locator</code> as well as internal-only + * Provides the implementation of a distribution {@code Locator} as well as internal-only * functionality. * <p> * This class has APIs that perform essentially three layers of services. At the bottom layer is the @@ -94,24 +93,15 @@ import org.apache.geode.management.internal.configuration.messages.SharedConfigu * Server Location Service DistributedSystem Peer Location Service * <p> * The startLocator() methods provide a way to start all three services in one call. Otherwise, the - * services can be started independently <code> locator = createLocator() + * services can be started independently {@code locator = createLocator() * locator.startPeerLocation(); locator.startDistributeSystem(); * * @since GemFire 4.0 */ public class InternalLocator extends Locator implements ConnectListener { - private static final Logger logger = LogService.getLogger(); /** - * How long (in milliseconds) a member that we haven't heard from in a while should live before we - * call it dead? - */ - private static final long EXPIRY_MS = 60000; // one minute - - private static final int SHARED_CONFIG_STATUS_TIMEOUT = 10000; // 10 seconds - - /** * system property name for forcing an locator distribution manager type */ public static final String FORCE_LOCATOR_DM_TYPE = "Locator.forceLocatorDMType"; @@ -127,8 +117,6 @@ public class InternalLocator extends Locator implements ConnectListener { public static final String LOCATORS_PREFERRED_AS_COORDINATORS = DistributionConfig.GEMFIRE_PREFIX + "disable-floating-coordinator"; - ///////////////////// Instance Fields ////////////////////// - /** * The tcp server responding to locator requests */ @@ -148,7 +136,7 @@ public class InternalLocator extends Locator implements ConnectListener { * The cache owned by this locator, if any. Note that if a cache already exists because the * locator is being colocated in a normal member this field will be null. */ - private Cache myCache; + private InternalCache myCache; /** * locator state file @@ -175,7 +163,7 @@ public class InternalLocator extends Locator implements ConnectListener { private DistributionConfigImpl config; - private LocatorMembershipListener locatorListener; + private final LocatorMembershipListener locatorListener; private WanLocatorDiscoverer locatorDiscoverer; @@ -197,30 +185,29 @@ public class InternalLocator extends Locator implements ConnectListener { private volatile Thread restartThread; - public boolean isSharedConfigurationEnabled() { + boolean isSharedConfigurationEnabled() { return this.config.getEnableClusterConfiguration(); } - public boolean loadFromSharedConfigDir() { + private boolean loadFromSharedConfigDir() { return this.config.getLoadClusterConfigFromDir(); } public boolean isSharedConfigurationRunning() { - if (this.sharedConfig != null) { - return this.sharedConfig.getStatus() == SharedConfigurationStatus.RUNNING; - } else { - return false; - } + return this.sharedConfig != null + && this.sharedConfig.getStatus() == SharedConfigurationStatus.RUNNING; } - ////////////////////// Static Methods ///////////////////// - /** * the locator hosted by this JVM. As of 7.0 it is a singleton. + * + * GuardedBy must synchronize on locatorLock */ - private static InternalLocator locator; // must synchronize on locatorLock + private static InternalLocator locator; + private static final Object locatorLock = new Object(); + // TODO: getLocator() overrides static method of a superclass public static InternalLocator getLocator() { // synchronize in order to fix #46336 (race condition in createLocator) synchronized (locatorLock) { @@ -228,20 +215,22 @@ public class InternalLocator extends Locator implements ConnectListener { } } + // TODO: hasLocator() overrides static method of a superclass public static boolean hasLocator() { synchronized (locatorLock) { return locator != null; } } - private static boolean removeLocator(InternalLocator l) { - if (l == null) { + // TODO: return value of removeLocator is never used + private static boolean removeLocator(InternalLocator locator) { + if (locator == null) { return false; } synchronized (locatorLock) { if (hasLocator()) { - if (l.equals(locator)) { - locator = null; + if (locator.equals(InternalLocator.locator)) { + InternalLocator.locator = null; return true; } } @@ -269,32 +258,31 @@ public class InternalLocator extends Locator implements ConnectListener { */ public static InternalLocator createLocator(int port, File logFile, File stateFile, InternalLogWriter logger, InternalLogWriter securityLogger, InetAddress bindAddress, - String hostnameForClients, java.util.Properties distributedSystemProperties, - boolean startDistributedSystem) throws IOException { + String hostnameForClients, Properties distributedSystemProperties, + boolean startDistributedSystem) { synchronized (locatorLock) { if (hasLocator()) { throw new IllegalStateException( "A locator can not be created because one already exists in this JVM."); } - InternalLocator l = + InternalLocator locator = new InternalLocator(port, logFile, stateFile, logger, securityLogger, bindAddress, hostnameForClients, distributedSystemProperties, null, startDistributedSystem); - locator = l; - return l; + InternalLocator.locator = locator; + return locator; } } - private static void setLocator(InternalLocator l) { + private static void setLocator(InternalLocator locator) { synchronized (locatorLock) { - if (locator != null && locator != l) { + if (InternalLocator.locator != null && InternalLocator.locator != locator) { throw new IllegalStateException( "A locator can not be created because one already exists in this JVM."); } - locator = l; + InternalLocator.locator = locator; } } - /** * Creates a distribution locator that runs in this VM on the given port and bind address and * creates a distributed system. @@ -306,7 +294,6 @@ public class InternalLocator extends Locator implements ConnectListener { * @param dsProperties optional properties to configure the distributed system (e.g., mcast * addr/port, other locators) * @param hostnameForClients the name to give to clients for connecting to this locator - * @throws IOException * @since GemFire 7.0 */ public static InternalLocator startLocator(int port, File logFile, File stateFile, @@ -316,7 +303,6 @@ public class InternalLocator extends Locator implements ConnectListener { dsProperties, hostnameForClients); } - /** * Creates a distribution locator that runs in this VM on the given port and bind address. * <p> @@ -330,8 +316,6 @@ public class InternalLocator extends Locator implements ConnectListener { * @param dsProperties optional properties to configure the distributed system (e.g., mcast * addr/port, other locators) * @param hostnameForClients the name to give to clients for connecting to this locator - * - * @throws IOException */ public static InternalLocator startLocator(int port, File logFile, File stateFile, InternalLogWriter logger, InternalLogWriter securityLogger, InetAddress bindAddress, @@ -339,58 +323,57 @@ public class InternalLocator extends Locator implements ConnectListener { throws IOException { System.setProperty(FORCE_LOCATOR_DM_TYPE, "true"); - InternalLocator slocator = null; + InternalLocator newLocator = null; boolean startedLocator = false; try { - slocator = createLocator(port, logFile, stateFile, logger, securityLogger, bindAddress, + newLocator = createLocator(port, logFile, stateFile, logger, securityLogger, bindAddress, hostnameForClients, dsProperties, startDistributedSystem); // TODO:GEODE-1243: this.server is now a TcpServer and it should store or return its non-zero // port in a variable to use here try { - slocator.startPeerLocation(startDistributedSystem); + newLocator.startPeerLocation(startDistributedSystem); if (startDistributedSystem) { try { - slocator.startDistributedSystem(); // TODO:GEODE-1243: throws Exception if TcpServer - // still has zero for its locator port + // TODO:GEODE-1243: throws Exception if TcpServer still has zero for its locator port + newLocator.startDistributedSystem(); } catch (RuntimeException e) { - slocator.stop(); + newLocator.stop(); throw e; } // fix bug #46324 - final InternalDistributedSystem ids = (InternalDistributedSystem) slocator.myDs; + final InternalDistributedSystem ids = newLocator.myDs; if (ids != null) { ids.getDistributionManager().addHostedLocators(ids.getDistributedMember(), - getLocatorStrings(), slocator.isSharedConfigurationEnabled()); + getLocatorStrings(), newLocator.isSharedConfigurationEnabled()); } } - } catch (LocatorCancelException e) { - slocator.stop(); + } catch (final LocatorCancelException ignored) { + newLocator.stop(); } - InternalDistributedSystem sys = InternalDistributedSystem.getConnectedInstance(); if (sys != null) { try { - slocator.startServerLocation(sys); + newLocator.startServerLocation(sys); } catch (RuntimeException e) { - slocator.stop(); + newLocator.stop(); throw e; } } - slocator.endStartLocator(null); + newLocator.endStartLocator(null); startedLocator = true; - return slocator; + return newLocator; } finally { System.getProperties().remove(FORCE_LOCATOR_DM_TYPE); if (!startedLocator) { // fix for bug 46314 - removeLocator(slocator); + removeLocator(newLocator); } } } @@ -407,7 +390,7 @@ public class InternalLocator extends Locator implements ConnectListener { return false; } - InternalDistributedSystem ids = (InternalDistributedSystem) internalLocator.myDs; + InternalDistributedSystem ids = internalLocator.myDs; if (ids == null) { return false; } @@ -419,10 +402,8 @@ public class InternalLocator extends Locator implements ConnectListener { return distMgr.getDMType() == DistributionManager.LOCATOR_DM_TYPE; } - /////////////////////// Constructors ////////////////////// - /** - * Creates a new <code>Locator</code> with the given port, log file, logger, and bind address. + * Creates a new {@code Locator} with the given port, log file, logger, and bind address. * * @param port the tcp/ip port to listen on * @param logF the file that log messages should be written to @@ -441,12 +422,14 @@ public class InternalLocator extends Locator implements ConnectListener { InternalLogWriter securityLogWriter, // LOG: 1 non-null source: GemFireDistributionLocator(same instance as logWriter), // InternalDistributedSystem - InetAddress bindAddress, String hostnameForClients, - java.util.Properties distributedSystemProperties, DistributionConfigImpl cfg, - boolean startDistributedSystem) { + InetAddress bindAddress, String hostnameForClients, Properties distributedSystemProperties, + DistributionConfigImpl cfg, boolean startDistributedSystem) { + + // TODO: the following three assignments are already done in superclass this.logFile = logF; this.bindAddress = bindAddress; this.hostnameForClients = hostnameForClients; + if (stateF == null) { this.stateFile = new File("locator" + port + "view.dat"); } else { @@ -456,23 +439,23 @@ public class InternalLocator extends Locator implements ConnectListener { this.productUseLog = new ProductUseLog(productUseFile); this.config = cfg; - env = new Properties(); + this.env = new Properties(); // set bind-address explicitly only if not wildcard and let any explicit // value in distributedSystemProperties take precedence (#46870) if (bindAddress != null && !bindAddress.isAnyLocalAddress()) { - env.setProperty(BIND_ADDRESS, bindAddress.getHostAddress()); + this.env.setProperty(BIND_ADDRESS, bindAddress.getHostAddress()); } if (distributedSystemProperties != null) { - env.putAll(distributedSystemProperties); + this.env.putAll(distributedSystemProperties); } - env.setProperty(CACHE_XML_FILE, ""); + this.env.setProperty(CACHE_XML_FILE, ""); // create a DC so that all of the lookup rules, gemfire.properties, etc, // are considered and we have a config object we can trust if (this.config == null) { - this.config = new DistributionConfigImpl(env); + this.config = new DistributionConfigImpl(this.env); this.env.clear(); this.env.putAll(this.config.getProps()); } @@ -480,9 +463,9 @@ public class InternalLocator extends Locator implements ConnectListener { final boolean hasLogFileButConfigDoesNot = this.logFile != null && this.config.getLogFile() .toString().equals(DistributionConfig.DEFAULT_LOG_FILE.toString()); if (logWriter == null && hasLogFileButConfigDoesNot) { - this.config.unsafeSetLogFile(this.logFile); // LOG: this is(was) a hack for when logFile and - // config don't match -- if config specifies a - // different log-file things will break! + // LOG: this is(was) a hack for when logFile and config don't match -- if config specifies a + // different log-file things will break! + this.config.unsafeSetLogFile(this.logFile); } // LOG: create LogWriterAppenders (these are closed at shutdown) @@ -520,26 +503,24 @@ public class InternalLocator extends Locator implements ConnectListener { if (securityLogWriter == null) { securityLogWriter = LogWriterFactory.createLogWriterLogger(false, true, this.config, false); - ((LogWriterLogger) logWriter).setLogWriterLevel(this.config.getSecurityLogLevel()); + logWriter.setLogWriterLevel(this.config.getSecurityLogLevel()); securityLogWriter.fine("SecurityLogWriter for locator is created."); } SocketCreatorFactory.setDistributionConfig(this.config); this.locatorListener = WANServiceProvider.createLocatorMembershipListener(); - if (locatorListener != null) { + if (this.locatorListener != null) { // We defer setting the port until the handler is init'd - that way we'll have an actual port - // in the - // case where we're starting with port = 0. - this.locatorListener.setConfig(this.getConfig()); + // in the case where we're starting with port = 0. + this.locatorListener.setConfig(getConfig()); } this.handler = new PrimaryHandler(this, locatorListener); ThreadGroup group = LoggingThreadGroup.createThreadGroup("Distribution locators", logger); - stats = new LocatorStats(); + this.stats = new LocatorStats(); - - server = new TcpServer(port, this.bindAddress, null, this.config, this.handler, + this.server = new TcpServer(port, this.bindAddress, null, this.config, this.handler, new DelayedPoolStatHelper(), group, this.toString()); } @@ -551,10 +532,9 @@ public class InternalLocator extends Locator implements ConnectListener { this.productUseLog = new ProductUseLog(productUseFile); } - private void startTcpServer() throws IOException { logger.info(LocalizedMessage.create(LocalizedStrings.InternalLocator_STARTING_0, this)); - server.start(); + this.server.start(); } public ClusterConfigurationService getSharedConfiguration() { @@ -562,19 +542,20 @@ public class InternalLocator extends Locator implements ConnectListener { } public DistributionConfigImpl getConfig() { - return config; + return this.config; } /** * Start peer location in this locator. If you plan on starting a distributed system later, this * method should be called first so that the distributed system can use this locator. - * + * <p> + * TODO: parameter withDS is never used + * * @param withDS true if a distributed system has been or will be started * - * @throws IOException * @since GemFire 5.7 */ - public void startPeerLocation(boolean withDS) throws IOException { + void startPeerLocation(boolean withDS) throws IOException { if (isPeerLocator()) { throw new IllegalStateException( LocalizedStrings.InternalLocator_PEER_LOCATION_IS_ALREADY_RUNNING_FOR_0 @@ -595,18 +576,18 @@ public class InternalLocator extends Locator implements ConnectListener { } else { // check if security is enabled String prop = this.config.getSecurityPeerAuthInit(); - locatorsAreCoordinators = (prop != null && prop.length() > 0); + locatorsAreCoordinators = prop != null && !prop.isEmpty(); if (!locatorsAreCoordinators) { locatorsAreCoordinators = Boolean.getBoolean(LOCATORS_PREFERRED_AS_COORDINATORS); } } - this.locatorImpl = - MemberFactory.newLocatorHandler(this.bindAddress, this.stateFile, locatorsProp, - locatorsAreCoordinators, networkPartitionDetectionEnabled, stats, securityUDPDHAlgo); + this.locatorImpl = MemberFactory.newLocatorHandler(this.bindAddress, this.stateFile, + locatorsProp, locatorsAreCoordinators, networkPartitionDetectionEnabled, this.stats, + securityUDPDHAlgo); this.handler.addHandler(PeerLocatorRequest.class, this.locatorImpl); - peerLocator = true; - if (!server.isAlive()) { + this.peerLocator = true; + if (!this.server.isAlive()) { startTcpServer(); } } @@ -624,15 +605,17 @@ public class InternalLocator extends Locator implements ConnectListener { /** * For backward-compatibility we retain this method + * <p> + * TODO: parameters peerLocator and serverLocator and b1 are never used * * @deprecated use a form of the method that does not have peerLocator/serverLocator parameters */ + @Deprecated public static InternalLocator startLocator(int locatorPort, File logFile, File stateFile, InternalLogWriter logger, InternalLogWriter logger1, InetAddress addr, Properties dsProperties, boolean peerLocator, boolean serverLocator, String s, boolean b1) throws IOException { return startLocator(locatorPort, logFile, stateFile, logger, logger1, addr, dsProperties, s); - } class SharedConfigurationRunnable implements Runnable { @@ -642,23 +625,19 @@ public class InternalLocator extends Locator implements ConnectListener { @Override public void run() { try { - if (locator.sharedConfig == null) { + if (this.locator.sharedConfig == null) { // locator.sharedConfig will already be created in case of auto-reconnect - locator.sharedConfig = new ClusterConfigurationService(locator.myCache); + this.locator.sharedConfig = new ClusterConfigurationService(locator.myCache); } - locator.sharedConfig.initSharedConfiguration(locator.loadFromSharedConfigDir()); - locator.installSharedConfigDistribution(); + this.locator.sharedConfig.initSharedConfiguration(this.locator.loadFromSharedConfigDir()); + this.locator.installSharedConfigDistribution(); logger.info( "Cluster configuration service start up completed successfully and is now running ...."); - } catch (CancelException e) { + } catch (CancelException | LockServiceDestroyedException e) { if (logger.isDebugEnabled()) { logger.debug("Cluster configuration start up was cancelled", e); } - } catch (LockServiceDestroyedException e) { - if (logger.isDebugEnabled()) { - logger.debug("Cluster configuration start up was cancelled", e); - } - } catch (Throwable e) { + } catch (Exception e) { logger.error(e.getMessage(), e); } } @@ -669,10 +648,9 @@ public class InternalLocator extends Locator implements ConnectListener { * stopped, this distributed system will be disconnected. If a distributed system already exists, * this method will have no affect. * - * @throws UnknownHostException * @since GemFire 5.7 */ - public void startDistributedSystem() throws UnknownHostException { + private void startDistributedSystem() throws UnknownHostException { InternalDistributedSystem existing = InternalDistributedSystem.getConnectedInstance(); if (existing != null) { @@ -681,27 +659,25 @@ public class InternalLocator extends Locator implements ConnectListener { .create(LocalizedStrings.InternalLocator_USING_EXISTING_DISTRIBUTED_SYSTEM__0, existing)); startCache(existing); } else { - String thisLocator; - { - StringBuilder sb = new StringBuilder(100); - if (bindAddress != null) { - sb.append(bindAddress.getHostAddress()); - } else { - sb.append(SocketCreator.getLocalHost().getHostAddress()); - } - sb.append('[').append(getPort()).append(']'); - thisLocator = sb.toString(); + + StringBuilder sb = new StringBuilder(100); + if (this.bindAddress != null) { + sb.append(this.bindAddress.getHostAddress()); + } else { + sb.append(SocketCreator.getLocalHost().getHostAddress()); } + sb.append('[').append(getPort()).append(']'); + String thisLocator = sb.toString(); - if (peerLocator) { + if (this.peerLocator) { // append this locator to the locators list from the config properties // this.logger.config("ensuring that this locator is in the locators list"); boolean setLocatorsProp = false; String locatorsProp = this.config.getLocators(); - if (locatorsProp != null && locatorsProp.trim().length() > 0) { + if (locatorsProp != null && !locatorsProp.trim().isEmpty()) { if (!locatorsProp.contains(thisLocator)) { - locatorsProp = locatorsProp + "," + thisLocator; + locatorsProp = locatorsProp + ',' + thisLocator; setLocatorsProp = true; } } else { @@ -722,7 +698,6 @@ public class InternalLocator extends Locator implements ConnectListener { // No longer default mcast-port to zero. See 46277. } - Properties connectEnv = new Properties(); // LogWriterAppender is now shared via that class // using a DistributionConfig earlier in this method @@ -736,13 +711,13 @@ public class InternalLocator extends Locator implements ConnectListener { LocalizedStrings.InternalDistributedSystem_STARTUP_CONFIGURATIONN_0, this.config.toLoggerString())); - myDs = (InternalDistributedSystem) DistributedSystem.connect(connectEnv); + this.myDs = (InternalDistributedSystem) DistributedSystem.connect(connectEnv); - if (peerLocator) { - this.locatorImpl.setMembershipManager(myDs.getDM().getMembershipManager()); + if (this.peerLocator) { + this.locatorImpl.setMembershipManager(this.myDs.getDM().getMembershipManager()); } - myDs.addDisconnectListener(new DisconnectListener() { + this.myDs.addDisconnectListener(new DisconnectListener() { @Override public void onDisconnect(InternalDistributedSystem sys) { stop(false, false, false); @@ -754,25 +729,23 @@ public class InternalLocator extends Locator implements ConnectListener { logger.info(LocalizedMessage.create(LocalizedStrings.InternalLocator_LOCATOR_STARTED_ON__0, thisLocator)); - ((InternalDistributedSystem) myDs).setDependentLocator(this); + myDs.setDependentLocator(this); } } - private void startCache(DistributedSystem ds) { - - GemFireCacheImpl gfc = GemFireCacheImpl.getInstance(); - if (gfc == null) { + InternalCache internalCache = GemFireCacheImpl.getInstance(); + if (internalCache == null) { logger.info("Creating cache for locator."); - this.myCache = new CacheFactory(ds.getProperties()).create(); - gfc = (GemFireCacheImpl) this.myCache; + this.myCache = (InternalCache) new CacheFactory(ds.getProperties()).create(); + internalCache = this.myCache; } else { logger.info("Using existing cache for locator."); ((InternalDistributedSystem) ds).handleResourceEvent(ResourceEvent.LOCATOR_START, this); } - startJmxManagerLocationService(gfc); + startJmxManagerLocationService(internalCache); - startSharedConfigurationService(gfc); + startSharedConfigurationService(internalCache); } /** @@ -781,12 +754,10 @@ public class InternalLocator extends Locator implements ConnectListener { * * @param distributedSystem The distributed system to use for the statistics. * - * @throws UnknownHostException * @since GemFire 5.7 */ - public void endStartLocator(InternalDistributedSystem distributedSystem) - throws UnknownHostException { - env = null; + void endStartLocator(InternalDistributedSystem distributedSystem) throws UnknownHostException { + this.env = null; if (distributedSystem == null) { distributedSystem = InternalDistributedSystem.getConnectedInstance(); } @@ -798,7 +769,8 @@ public class InternalLocator extends Locator implements ConnectListener { this.locatorDiscoverer = WANServiceProvider.createLocatorDiscoverer(); if (this.locatorDiscoverer != null) { - this.locatorDiscoverer.discover(getPort(), config, locatorListener, hostnameForClients); + this.locatorDiscoverer.discover(getPort(), this.config, this.locatorListener, + this.hostnameForClients); } } @@ -811,7 +783,7 @@ public class InternalLocator extends Locator implements ConnectListener { * * @since GemFire 5.7 */ - public void startServerLocation(InternalDistributedSystem distributedSystem) throws IOException { + void startServerLocation(InternalDistributedSystem distributedSystem) throws IOException { if (isServerLocator()) { throw new IllegalStateException( LocalizedStrings.InternalLocator_SERVER_LOCATION_IS_ALREADY_RUNNING_FOR_0 @@ -831,16 +803,17 @@ public class InternalLocator extends Locator implements ConnectListener { this.productUseLog.monitorUse(distributedSystem); - ServerLocator sl = new ServerLocator(getPort(), this.bindAddress, this.hostnameForClients, - this.logFile, this.productUseLog, getConfig().getName(), distributedSystem, stats); - this.handler.addHandler(LocatorListRequest.class, sl); - this.handler.addHandler(ClientConnectionRequest.class, sl); - this.handler.addHandler(QueueConnectionRequest.class, sl); - this.handler.addHandler(ClientReplacementRequest.class, sl); - this.handler.addHandler(GetAllServersRequest.class, sl); - this.handler.addHandler(LocatorStatusRequest.class, sl); - this.serverLocator = sl; - if (!server.isAlive()) { + ServerLocator serverLocator = + new ServerLocator(getPort(), this.bindAddress, this.hostnameForClients, this.logFile, + this.productUseLog, getConfig().getName(), distributedSystem, this.stats); + this.handler.addHandler(LocatorListRequest.class, serverLocator); + this.handler.addHandler(ClientConnectionRequest.class, serverLocator); + this.handler.addHandler(QueueConnectionRequest.class, serverLocator); + this.handler.addHandler(ClientReplacementRequest.class, serverLocator); + this.handler.addHandler(GetAllServersRequest.class, serverLocator); + this.handler.addHandler(LocatorStatusRequest.class, serverLocator); + this.serverLocator = serverLocator; + if (!this.server.isAlive()) { startTcpServer(); } } @@ -851,14 +824,6 @@ public class InternalLocator extends Locator implements ConnectListener { @Override public void stop() { stop(false, false, true); - // SocketCreatorFactory.close(); - } - - /** - * Was this locator stopped during forced-disconnect processing but should reconnect? - */ - public boolean getStoppedForReconnect() { - return this.stoppedForReconnect; } /** @@ -886,7 +851,7 @@ public class InternalLocator extends Locator implements ConnectListener { while (this.server.isAlive() && System.currentTimeMillis() < endOfWait) { try { Thread.sleep(500); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); return; } @@ -943,14 +908,13 @@ public class InternalLocator extends Locator implements ConnectListener { logger.info(LocalizedMessage.create(LocalizedStrings.InternalLocator_0__IS_STOPPED, this)); - if (stoppedForReconnect) { + if (this.stoppedForReconnect) { if (this.myDs != null) { launchRestartThread(); } } } - /** * answers whether this locator is currently stopped */ @@ -962,21 +926,21 @@ public class InternalLocator extends Locator implements ConnectListener { if (!this.shutdownHandled.compareAndSet(false, true)) { return; // already shutdown } - productUseLog.close(); - if (myDs != null) { - ((InternalDistributedSystem) myDs).setDependentLocator(null); + this.productUseLog.close(); + if (this.myDs != null) { + this.myDs.setDependentLocator(null); } if (this.myCache != null && !this.stoppedForReconnect && !this.forcedDisconnect) { logger.info("Closing locator's cache"); try { this.myCache.close(); } catch (RuntimeException ex) { - logger.info("Could not close locator's cache because: {}", ex); + logger.info("Could not close locator's cache because: {}", ex.getMessage(), ex); } } - if (stats != null) { - stats.close(); + if (this.stats != null) { + this.stats.close(); } if (this.locatorListener != null) { @@ -984,11 +948,11 @@ public class InternalLocator extends Locator implements ConnectListener { } this.isSharedConfigurationStarted = false; - if (myDs != null && !this.forcedDisconnect) { - if (myDs.isConnected()) { + if (this.myDs != null && !this.forcedDisconnect) { + if (this.myDs.isConnected()) { logger.info(LocalizedMessage .create(LocalizedStrings.InternalLocator_DISCONNECTING_DISTRIBUTED_SYSTEM_FOR_0, this)); - myDs.disconnect(); + this.myDs.disconnect(); } } } @@ -1016,10 +980,10 @@ public class InternalLocator extends Locator implements ConnectListener { } else { logger.info("system was not restarted"); } - Thread rs = this.restartThread; - if (rs != null) { + Thread restartThread = this.restartThread; + if (restartThread != null) { logger.info("waiting for services to restart..."); - rs.join(); + restartThread.join(); this.restartThread = null; logger.info("done waiting for services to restart"); } @@ -1033,7 +997,9 @@ public class InternalLocator extends Locator implements ConnectListener { private void launchRestartThread() { // create a thread group having a last-chance exception-handler ThreadGroup group = LoggingThreadGroup.createThreadGroup("Locator restart thread group"); + // TODO: non-atomic operation on volatile field restartThread this.restartThread = new Thread(group, "Location services restart thread") { + @Override public void run() { boolean restarted = false; try { @@ -1064,7 +1030,7 @@ public class InternalLocator extends Locator implements ConnectListener { * * @return true if able to reconnect the locator to the new distributed system */ - public boolean attemptReconnect() throws InterruptedException, IOException { + private boolean attemptReconnect() throws InterruptedException, IOException { boolean restarted = false; if (this.stoppedForReconnect) { logger.info("attempting to restart locator"); @@ -1099,9 +1065,7 @@ public class InternalLocator extends Locator implements ConnectListener { ds.waitUntilReconnected(waitTime, TimeUnit.MILLISECONDS); } InternalDistributedSystem newSystem = (InternalDistributedSystem) ds.getReconnectedSystem(); - // LogWriter log = new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out); if (newSystem != null) { - // log.fine("reconnecting locator: starting location services"); if (!tcpServerStarted) { if (this.locatorListener != null) { this.locatorListener.clearLocatorInfo(); @@ -1113,11 +1077,10 @@ public class InternalLocator extends Locator implements ConnectListener { restarted = true; } } - logger.info("restart thread exiting. Service was " + (restarted ? "" : "not ") + "restarted"); + logger.info("restart thread exiting. Service was {}restarted", restarted ? "" : "not "); return restarted; } - private void restartWithoutDS() throws IOException { synchronized (locatorLock) { if (locator != this && hasLocator()) { @@ -1138,7 +1101,7 @@ public class InternalLocator extends Locator implements ConnectListener { } } - private void restartWithDS(InternalDistributedSystem newSystem, GemFireCacheImpl newCache) + private void restartWithDS(InternalDistributedSystem newSystem, InternalCache newCache) throws IOException { synchronized (locatorLock) { if (locator != this && hasLocator()) { @@ -1147,7 +1110,7 @@ public class InternalLocator extends Locator implements ConnectListener { } this.myDs = newSystem; this.myCache = newCache; - ((InternalDistributedSystem) myDs).setDependentLocator(this); + this.myDs.setDependentLocator(this); logger.info("Locator restart: initializing TcpServer"); if (isSharedConfigurationEnabled()) { this.sharedConfig = new ClusterConfigurationService(newCache); @@ -1168,21 +1131,19 @@ public class InternalLocator extends Locator implements ConnectListener { } logger.info("Locator restart: initializing JMX manager"); startJmxManagerLocationService(newCache); - endStartLocator((InternalDistributedSystem) myDs); + endStartLocator(this.myDs); logger.info("Locator restart completed"); } } - - // implementation of abstract method in Locator @Override public DistributedSystem getDistributedSystem() { - return myDs; + return this.myDs; } @Override public boolean isPeerLocator() { - return peerLocator; + return this.peerLocator; } @Override @@ -1204,25 +1165,21 @@ public class InternalLocator extends Locator implements ConnectListener { * * @return the port the locator is listening on or null if it has not yet been started */ + @Override public Integer getPort() { - if (server != null) { - return server.getPort(); + if (this.server != null) { + return this.server.getPort(); } return null; } - /****** - * - * - */ class FetchSharedConfigStatus implements Callable<SharedConfigurationStatusResponse> { static final int SLEEPTIME = 1000; static final byte MAX_RETRIES = 5; - public SharedConfigurationStatusResponse call() throws Exception { - SharedConfigurationStatusResponse response; - + @Override + public SharedConfigurationStatusResponse call() throws InterruptedException { final InternalLocator locator = InternalLocator.this; for (int i = 0; i < MAX_RETRIES; i++) { if (locator.sharedConfig != null) { @@ -1234,6 +1191,7 @@ public class InternalLocator extends Locator implements ConnectListener { } Thread.sleep(SLEEPTIME); } + SharedConfigurationStatusResponse response; if (locator.sharedConfig != null) { response = locator.sharedConfig.createStatusResponse(); } else { @@ -1244,57 +1202,52 @@ public class InternalLocator extends Locator implements ConnectListener { } } - public SharedConfigurationStatusResponse getSharedConfigurationStatus() { - ExecutorService es = - ((GemFireCacheImpl) myCache).getDistributionManager().getWaitingThreadPool(); + ExecutorService es = this.myCache.getDistributionManager().getWaitingThreadPool(); Future<SharedConfigurationStatusResponse> statusFuture = es.submit(new FetchSharedConfigStatus()); - SharedConfigurationStatusResponse response = null; + SharedConfigurationStatusResponse response; try { response = statusFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { - logger.info("Exception occured while fetching the status {}", CliUtil.stackTraceAsString(e)); + logger.info("Exception occurred while fetching the status {}", CliUtil.stackTraceAsString(e)); response = new SharedConfigurationStatusResponse(); response.setStatus(SharedConfigurationStatus.UNDETERMINED); } return response; } - public static class PrimaryHandler implements TcpHandler { - private volatile HashMap<Class, TcpHandler> handlerMapping = new HashMap<Class, TcpHandler>(); - private volatile HashSet<TcpHandler> allHandlers = new HashSet<TcpHandler>(); + private volatile HashMap<Class, TcpHandler> handlerMapping = new HashMap<>(); + private volatile HashSet<TcpHandler> allHandlers = new HashSet<>(); private TcpServer tcpServer; private final LocatorMembershipListener locatorListener; - // private final List<LocatorJoinMessage> locatorJoinMessages; - private Object locatorJoinObject = new Object(); - private InternalLocator internalLocator; + private final InternalLocator internalLocator; // GEODE-2253 test condition private boolean hasWaitedForHandlerInitialization = false; - public PrimaryHandler(InternalLocator locator, LocatorMembershipListener listener) { + PrimaryHandler(InternalLocator locator, LocatorMembershipListener listener) { this.locatorListener = listener; - internalLocator = locator; - // this.locatorJoinMessages = new ArrayList<LocatorJoinMessage>(); + this.internalLocator = locator; } // this method is synchronized to make sure that no new handlers are added while // initialization is taking place. + @Override public synchronized void init(TcpServer tcpServer) { if (this.locatorListener != null) { // This is deferred until now as the initial requested port could have been 0 - this.locatorListener.setPort(internalLocator.getPort()); + this.locatorListener.setPort(this.internalLocator.getPort()); } this.tcpServer = tcpServer; - for (Iterator itr = allHandlers.iterator(); itr.hasNext();) { - TcpHandler handler = (TcpHandler) itr.next(); + for (TcpHandler handler : this.allHandlers) { handler.init(tcpServer); } } + @Override public void restarting(DistributedSystem ds, GemFireCache cache, ClusterConfigurationService sharedConfig) { if (ds != null) { @@ -1304,33 +1257,35 @@ public class InternalLocator extends Locator implements ConnectListener { } } + @Override public Object processRequest(Object request) throws IOException { long giveup = 0; while (giveup == 0 || System.currentTimeMillis() < giveup) { TcpHandler handler = null; if (request instanceof PeerLocatorRequest) { - handler = (TcpHandler) handlerMapping.get(PeerLocatorRequest.class); + handler = this.handlerMapping.get(PeerLocatorRequest.class); } else { - handler = (TcpHandler) handlerMapping.get(request.getClass()); + handler = this.handlerMapping.get(request.getClass()); } if (handler != null) { return handler.processRequest(request); } else { - if (locatorListener != null) { - return locatorListener.handleRequest(request); + if (this.locatorListener != null) { + return this.locatorListener.handleRequest(request); } else { // either there is a configuration problem or the locator is still starting up if (giveup == 0) { - int locatorWaitTime = internalLocator.getConfig().getLocatorWaitTime(); + int locatorWaitTime = this.internalLocator.getConfig().getLocatorWaitTime(); if (locatorWaitTime <= 0) { - locatorWaitTime = 30; // always retry some number of times + // always retry some number of times + locatorWaitTime = 30; } - hasWaitedForHandlerInitialization = true; - giveup = System.currentTimeMillis() + (locatorWaitTime * 1000); + this.hasWaitedForHandlerInitialization = true; + giveup = System.currentTimeMillis() + locatorWaitTime * 1000; try { Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { // running in an executor - no need to set the interrupted flag on the thread return null; } @@ -1350,19 +1305,13 @@ public class InternalLocator extends Locator implements ConnectListener { * installed? */ public boolean hasWaitedForHandlerInitialization() { - return hasWaitedForHandlerInitialization; - } - - private JmxManagerLocatorResponse findJmxManager(JmxManagerLocatorRequest request) { - JmxManagerLocatorResponse result = null; - // NYI - return result; + return this.hasWaitedForHandlerInitialization; } + @Override public void shutDown() { try { - for (Iterator itr = allHandlers.iterator(); itr.hasNext();) { - TcpHandler handler = (TcpHandler) itr.next(); + for (TcpHandler handler : this.allHandlers) { handler.shutDown(); } } finally { @@ -1370,42 +1319,45 @@ public class InternalLocator extends Locator implements ConnectListener { } } - public synchronized boolean isHandled(Class clazz) { + synchronized boolean isHandled(Class clazz) { return this.handlerMapping.containsKey(clazz); } public synchronized void addHandler(Class clazz, TcpHandler handler) { - HashMap tmpHandlerMapping = new HashMap(handlerMapping); - HashSet tmpAllHandlers = new HashSet(allHandlers); + HashMap<Class, TcpHandler> tmpHandlerMapping = new HashMap<>(this.handlerMapping); + HashSet<TcpHandler> tmpAllHandlers = new HashSet<>(this.allHandlers); tmpHandlerMapping.put(clazz, handler); - if (tmpAllHandlers.add(handler) && tcpServer != null) { - handler.init(tcpServer); + if (tmpAllHandlers.add(handler) && this.tcpServer != null) { + handler.init(this.tcpServer); } - handlerMapping = tmpHandlerMapping; - allHandlers = tmpAllHandlers; + this.handlerMapping = tmpHandlerMapping; + this.allHandlers = tmpAllHandlers; } + @Override public void endRequest(Object request, long startTime) { - TcpHandler handler = (TcpHandler) handlerMapping.get(request.getClass()); + TcpHandler handler = this.handlerMapping.get(request.getClass()); if (handler != null) { handler.endRequest(request, startTime); } } + @Override public void endResponse(Object request, long startTime) { - TcpHandler handler = (TcpHandler) handlerMapping.get(request.getClass()); + TcpHandler handler = this.handlerMapping.get(request.getClass()); if (handler != null) { handler.endResponse(request, startTime); } } } + @Override public void onConnect(InternalDistributedSystem sys) { try { - stats.hookupStats(sys, SocketCreator.getLocalHost().getCanonicalHostName() + "-" - + server.getBindAddress().toString()); - } catch (UnknownHostException uhe) { - uhe.printStackTrace(); + this.stats.hookupStats(sys, + SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress()); + } catch (UnknownHostException e) { + logger.warn(e); } } @@ -1416,12 +1368,12 @@ public class InternalLocator extends Locator implements ConnectListener { * @see #getLocators() */ public static Collection<String> getLocatorStrings() { - Collection<String> locatorStrings = null; + Collection<String> locatorStrings; try { Collection<DistributionLocatorId> locatorIds = DistributionLocatorId.asDistributionLocatorIds(getLocators()); locatorStrings = DistributionLocatorId.asStrings(locatorIds); - } catch (UnknownHostException e) { + } catch (UnknownHostException ignored) { locatorStrings = null; } if (locatorStrings == null || locatorStrings.isEmpty()) { @@ -1436,19 +1388,19 @@ public class InternalLocator extends Locator implements ConnectListener { * recorded if a distributed system is started. */ protected class DelayedPoolStatHelper implements PoolStatHelper { - + @Override public void startJob() { stats.incRequestInProgress(1); } + @Override public void endJob() { stats.incRequestInProgress(-1); } } - public void startSharedConfigurationService(GemFireCacheImpl gfc) { - + private void startSharedConfigurationService(InternalCache internalCache) { installSharedConfigHandler(); if (this.config.getEnableClusterConfiguration() && !this.isSharedConfigurationStarted) { @@ -1458,25 +1410,26 @@ public class InternalLocator extends Locator implements ConnectListener { return; } - ExecutorService es = gfc.getDistributionManager().getThreadPool(); + ExecutorService es = internalCache.getDistributionManager().getThreadPool(); es.execute(new SharedConfigurationRunnable()); } else { logger.info("Cluster configuration service is disabled"); } } - public void startJmxManagerLocationService(GemFireCacheImpl gfc) { - if (gfc.getJmxManagerAdvisor() != null) { + public void startJmxManagerLocationService(InternalCache internalCache) { + if (internalCache.getJmxManagerAdvisor() != null) { if (!this.handler.isHandled(JmxManagerLocatorRequest.class)) { - this.handler.addHandler(JmxManagerLocatorRequest.class, new JmxManagerLocator(gfc)); + this.handler.addHandler(JmxManagerLocatorRequest.class, + new JmxManagerLocator(internalCache)); } } } - /*** + /** * Creates and installs the handler {@link ConfigurationRequestHandler} */ - public void installSharedConfigDistribution() { + private void installSharedConfigDistribution() { if (!this.handler.isHandled(ConfigurationRequest.class)) { this.handler.addHandler(ConfigurationRequest.class, new ConfigurationRequestHandler(this.sharedConfig)); @@ -1484,7 +1437,7 @@ public class InternalLocator extends Locator implements ConnectListener { } } - public void installSharedConfigHandler() { + private void installSharedConfigHandler() { if (!this.handler.isHandled(SharedConfigurationStatusRequest.class)) { this.handler.addHandler(SharedConfigurationStatusRequest.class, new SharedConfigurationStatusRequestHandler()); @@ -1493,7 +1446,7 @@ public class InternalLocator extends Locator implements ConnectListener { } public boolean hasHandlerForClass(Class messageClass) { - return (handler.isHandled(messageClass)); + return this.handler.isHandled(messageClass); } }
http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 8ae66d0..b45a8be 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -27,7 +27,7 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.*; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -77,13 +77,13 @@ import org.apache.geode.distributed.internal.membership.gms.SuspectMember; import org.apache.geode.distributed.internal.membership.gms.fd.GMSHealthMonitor; import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager; import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; -import org.apache.geode.distributed.internal.membership.gms.messenger.GMSQuorumChecker; import org.apache.geode.internal.Assert; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.Version; import org.apache.geode.internal.admin.remote.RemoteTransportConfig; import org.apache.geode.internal.cache.CacheServerImpl; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.partitioned.PartitionMessageWithDirectReply; import org.apache.geode.internal.cache.xmlcache.CacheServerCreation; import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator; @@ -1550,7 +1550,7 @@ public class GMSMembershipManager implements MembershipManager, Manager { /** generate XML from the cache before shutting down due to forced disconnect */ public void saveCacheXmlForReconnect(boolean sharedConfigEnabled) { // first save the current cache description so reconnect can rebuild the cache - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { if (!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile") && !sharedConfigEnabled) { http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 9845bf1..86fe532 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -14,30 +14,6 @@ */ package org.apache.geode.distributed.internal.tcpserver; -import org.apache.geode.CancelException; -import org.apache.geode.DataSerializer; -import org.apache.geode.SystemFailure; -import org.apache.geode.distributed.internal.ClusterConfigurationService; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.DistributionConfigImpl; -import org.apache.geode.distributed.internal.DistributionStats; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.PoolStatHelper; -import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; -import org.apache.geode.internal.DSFIDFactory; -import org.apache.geode.internal.GemFireVersion; -import org.apache.geode.internal.Version; -import org.apache.geode.internal.VersionedDataInputStream; -import org.apache.geode.internal.VersionedDataOutputStream; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.internal.cache.tier.sockets.HandShake; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.net.SocketCreator; -import org.apache.geode.internal.net.SocketCreatorFactory; -import org.apache.geode.internal.security.SecurableCommunicationChannel; -import org.apache.logging.log4j.Logger; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -59,8 +35,34 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.net.ssl.SSLException; +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.DataSerializer; +import org.apache.geode.SystemFailure; +import org.apache.geode.distributed.internal.ClusterConfigurationService; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.distributed.internal.DistributionStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.PoolStatHelper; +import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; +import org.apache.geode.internal.DSFIDFactory; +import org.apache.geode.internal.GemFireVersion; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataOutputStream; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.sockets.HandShake; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.net.SocketCreatorFactory; +import org.apache.geode.internal.security.SecurableCommunicationChannel; + /** * TCP server which listens on a port and delegates requests to a request handler. The server uses * expects messages containing a global version number, followed by a DataSerializable object @@ -99,20 +101,19 @@ public class TcpServer { public static int OLDTESTVERSION = OLDGOSSIPVERSION; public static final long SHUTDOWN_WAIT_TIME = 60 * 1000; - private static int MAX_POOL_SIZE = Integer - .getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.MAX_POOL_SIZE", 100).intValue(); + private static int MAX_POOL_SIZE = + Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.MAX_POOL_SIZE", 100); private static int POOL_IDLE_TIMEOUT = 60 * 1000; private static final Logger log = LogService.getLogger(); protected/* GemStoneAddition */ final/* GemStoneAddition */ static int READ_TIMEOUT = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT", 60 * 1000) - .intValue(); + Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT", 60 * 1000); // This is for backwards compatibility. The p2p.backlog flag used to be the only way to configure // the locator backlog. - private static final int P2P_BACKLOG = Integer.getInteger("p2p.backlog", 1000).intValue(); - private static final int BACKLOG = Integer - .getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.BACKLOG", P2P_BACKLOG).intValue(); + private static final int P2P_BACKLOG = Integer.getInteger("p2p.backlog", 1000); + private static final int BACKLOG = + Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.BACKLOG", P2P_BACKLOG); private final int port; private int serverSocketPortAtClose; @@ -129,7 +130,7 @@ public class TcpServer { private SocketCreator socketCreator; - /** + /* * GemStoneAddition - Initialize versions map. Warning: This map must be compatible with all * GemFire versions being handled by this member "With different GOSSIPVERION". If GOSSIPVERIONS * are same for then current GOSSIPVERSION should be used. @@ -189,15 +190,15 @@ public class TcpServer { POOL_IDLE_TIMEOUT, new ThreadPoolExecutor.CallerRunsPolicy()); } - public void restarting(InternalDistributedSystem ds, GemFireCacheImpl cache, + public void restarting(InternalDistributedSystem ds, InternalCache cache, ClusterConfigurationService sharedConfig) throws IOException { this.shuttingDown = false; this.handler.restarting(ds, cache, sharedConfig); startServerThread(); this.executor = createExecutor(this.poolHelper, this.threadGroup); - this.log.info("TcpServer@" + System.identityHashCode(this) - + " restarting: completed. Server thread=" + serverThread + "@" - + System.identityHashCode(serverThread) + ";alive=" + serverThread.isAlive()); + log.info("TcpServer@" + System.identityHashCode(this) + + " restarting: completed. Server thread=" + this.serverThread + '@' + + System.identityHashCode(this.serverThread) + ";alive=" + this.serverThread.isAlive()); } public void start() throws IOException { @@ -280,7 +281,7 @@ public class TcpServer { // Allocate no objects here! try { srv_sock.close(); - } catch (IOException e) { + } catch (IOException ignore) { // ignore } SystemFailure.checkFailure(); // throws @@ -318,7 +319,7 @@ public class TcpServer { executor.shutdown(); try { executor.awaitTermination(SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } handler.shutDown(); @@ -414,9 +415,9 @@ public class TcpServer { handler.endResponse(request, startTime); - } catch (EOFException ex) { + } catch (EOFException ignore) { // client went away - ignore - } catch (CancelException ex) { + } catch (CancelException ignore) { // ignore } catch (ClassNotFoundException ex) { String sender = null; @@ -460,7 +461,7 @@ public class TcpServer { } finally { try { sock.close(); - } catch (IOException e) { + } catch (IOException ignore) { // ignore } } @@ -511,8 +512,6 @@ public class TcpServer { /** * Returns GossipVersion for older Gemfire versions. * - * @param ordinal - * * @return gossip version */ public static int getGossipVersionForOrdinal(short ordinal) { @@ -525,12 +524,12 @@ public class TcpServer { Iterator<Map.Entry> itr = TcpServer.GOSSIP_TO_GEMFIRE_VERSION_MAP.entrySet().iterator(); while (itr.hasNext()) { Map.Entry entry = itr.next(); - short o = ((Short) entry.getValue()).shortValue(); + short o = (Short) entry.getValue(); if (o == ordinal) { - return ((Integer) entry.getKey()).intValue(); + return (Integer) entry.getKey(); } else if (o < ordinal && o > closest) { closest = o; - closestGV = ((Integer) entry.getKey()).intValue(); + closestGV = (Integer) entry.getKey(); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index c02dc47..7aeec03 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -203,6 +203,7 @@ import org.apache.geode.internal.cache.DestroyPartitionedRegionMessage; import org.apache.geode.internal.cache.DestroyRegionOperation; import org.apache.geode.internal.cache.DistTXPrecommitMessage; import org.apache.geode.internal.cache.DistTXCommitMessage; +import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPreCommitResponse; import org.apache.geode.internal.cache.DistTXRollbackMessage; import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionWithContextMessage; @@ -390,7 +391,6 @@ import org.apache.geode.internal.cache.versions.VMVersionTag; import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; -import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.BatchRemovalReplyMessage; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage; @@ -913,8 +913,7 @@ public final class DSFIDFactory implements DataSerializableFixedID { registerDSFID(PR_QUERY_TRACE_INFO, PRQueryTraceInfo.class); registerDSFID(INDEX_CREATION_DATA, IndexCreationData.class); registerDSFID(DIST_TX_OP, DistTxEntryEvent.class); - registerDSFID(DIST_TX_PRE_COMMIT_RESPONSE, - DistTXPrecommitMessage.DistTxPrecommitResponse.class); + registerDSFID(DIST_TX_PRE_COMMIT_RESPONSE, DistTxPreCommitResponse.class); registerDSFID(DIST_TX_THIN_ENTRY_STATE, TXEntryState.DistTxThinEntryState.class); registerDSFID(SERVER_PING_MESSAGE, ServerPingMessage.class); registerDSFID(PR_DESTROY_ON_DATA_STORE_MESSAGE, DestroyRegionOnDataStoreMessage.class); http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java index acb7d22..820eb85 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java @@ -14,26 +14,16 @@ */ package org.apache.geode.internal; -import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner; -import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult; -import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Declarable; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.pdx.internal.TypeRegistry; -import org.apache.logging.log4j.Logger; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Files; @@ -46,6 +36,20 @@ import java.util.List; import java.util.Properties; import java.util.jar.JarEntry; import java.util.jar.JarInputStream; +import java.util.regex.Pattern; + +import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner; +import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult; +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Declarable; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.pdx.internal.TypeRegistry; /** * ClassLoader for a single JAR file. @@ -53,18 +57,21 @@ import java.util.jar.JarInputStream; * @since GemFire 7.0 */ public class DeployedJar { - private final static Logger logger = LogService.getLogger(); - private final static MessageDigest messageDigest = getMessageDigest(); + + private static final Logger logger = LogService.getLogger(); + private static final MessageDigest messageDigest = getMessageDigest(); + private static final byte[] ZERO_BYTES = new byte[0]; + private static final Pattern PATTERN_SLASH = Pattern.compile("/"); private final String jarName; private final File file; private final byte[] md5hash; - private final Collection<Function> registeredFunctions = new ArrayList<Function>(); + private final Collection<Function> registeredFunctions = new ArrayList<>(); private static MessageDigest getMessageDigest() { try { return MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException nsaex) { + } catch (NoSuchAlgorithmException ignored) { // Failure just means we can't do a simple compare for content equality } return null; @@ -75,7 +82,7 @@ public class DeployedJar { } public int getVersion() { - return JarDeployer.extractVersionFromFilename(file.getName()); + return JarDeployer.extractVersionFromFilename(this.file.getName()); } public DeployedJar(File versionedJarFile, String jarName) throws IOException { @@ -86,7 +93,7 @@ public class DeployedJar { * Writes the given jarBytes to versionedJarFile */ public DeployedJar(File versionedJarFile, final String jarName, byte[] jarBytes) - throws IOException { + throws FileNotFoundException { Assert.assertTrue(jarBytes != null, "jarBytes cannot be null"); Assert.assertTrue(jarName != null, "jarName cannot be null"); Assert.assertTrue(versionedJarFile != null, "versionedJarFile cannot be null"); @@ -124,13 +131,13 @@ public class DeployedJar { try { jarInputStream = new JarInputStream(inputStream); - valid = (jarInputStream.getNextJarEntry() != null); + valid = jarInputStream.getNextJarEntry() != null; } catch (IOException ignore) { // Ignore this exception and just return false } finally { try { jarInputStream.close(); - } catch (IOException ioex) { + } catch (IOException ignored) { // Ignore this exception and just return result } } @@ -144,11 +151,10 @@ public class DeployedJar { * @param jarBytes Bytes of data to be validated. * @return True if the data has JAR content, false otherwise */ - public static boolean hasValidJarContent(final byte[] jarBytes) { + static boolean hasValidJarContent(final byte[] jarBytes) { return hasValidJarContent(new ByteArrayInputStream(jarBytes)); } - /** * Scan the JAR file and attempt to load all classes and register any function classes found. */ @@ -158,7 +164,7 @@ public class DeployedJar { // in the constructor. Once this method is finished, all classes will have been loaded and // there will no longer be a need to hang on to the original contents so they will be // discarded. - public synchronized void loadClassesAndRegisterFunctions() throws ClassNotFoundException { + synchronized void loadClassesAndRegisterFunctions() throws ClassNotFoundException { final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { logger.debug("Registering functions with DeployedJar: {}", this); @@ -175,8 +181,8 @@ public class DeployedJar { while (jarEntry != null) { if (jarEntry.getName().endsWith(".class")) { - final String className = jarEntry.getName().replaceAll("/", "\\.").substring(0, - (jarEntry.getName().length() - 6)); + final String className = PATTERN_SLASH.matcher(jarEntry.getName()).replaceAll("\\.") + .substring(0, jarEntry.getName().length() - 6); if (functionClasses.contains(className)) { if (isDebugEnabled) { @@ -228,12 +234,11 @@ public class DeployedJar { this.registeredFunctions.clear(); try { - TypeRegistry typeRegistry = - ((GemFireCacheImpl) CacheFactory.getAnyInstance()).getPdxRegistry(); + TypeRegistry typeRegistry = ((InternalCache) CacheFactory.getAnyInstance()).getPdxRegistry(); if (typeRegistry != null) { typeRegistry.flushCache(); } - } catch (CacheClosedException ccex) { + } catch (CacheClosedException ignored) { // That's okay, it just means there was nothing to flush to begin with } } @@ -245,7 +250,7 @@ public class DeployedJar { * @param compareToBytes Bytes to compare the original content to * @return True of the MD5 hash is the same o */ - public boolean hasSameContentAs(final byte[] compareToBytes) { + boolean hasSameContentAs(final byte[] compareToBytes) { // If the MD5 hash can't be calculated then silently return no match if (messageDigest == null || this.md5hash == null) { return Arrays.equals(compareToBytes, getJarContent()); @@ -268,16 +273,15 @@ public class DeployedJar { * @return A collection of Objects that implement the Function interface. */ private Collection<Function> getRegisterableFunctionsFromClass(Class<?> clazz) { - final List<Function> registerableFunctions = new ArrayList<Function>(); + final List<Function> registerableFunctions = new ArrayList<>(); try { if (Function.class.isAssignableFrom(clazz) && !Modifier.isAbstract(clazz.getModifiers())) { boolean registerUninitializedFunction = true; if (Declarable.class.isAssignableFrom(clazz)) { try { - final List<Properties> propertiesList = - ((GemFireCacheImpl) CacheFactory.getAnyInstance()) - .getDeclarableProperties(clazz.getName()); + final List<Properties> propertiesList = ((InternalCache) CacheFactory.getAnyInstance()) + .getDeclarableProperties(clazz.getName()); if (!propertiesList.isEmpty()) { registerUninitializedFunction = false; @@ -295,7 +299,7 @@ public class DeployedJar { } } } - } catch (CacheClosedException ccex) { + } catch (CacheClosedException ignored) { // That's okay, it just means there were no properties to init the function with } } @@ -309,7 +313,7 @@ public class DeployedJar { } } } catch (Exception ex) { - logger.error("Attempting to register function from JAR file: " + this.file.getAbsolutePath(), + logger.error("Attempting to register function from JAR file: {}", this.file.getAbsolutePath(), ex); } @@ -349,15 +353,14 @@ public class DeployedJar { } private byte[] getJarContent() { - InputStream channelInputStream = null; try { - channelInputStream = new FileInputStream(this.file); + InputStream channelInputStream = new FileInputStream(this.file); final ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); final byte[] bytes = new byte[4096]; int bytesRead; - while (((bytesRead = channelInputStream.read(bytes)) != -1)) { + while ((bytesRead = channelInputStream.read(bytes)) != -1) { byteOutStream.write(bytes, 0, bytesRead); } channelInputStream.close(); @@ -366,7 +369,7 @@ public class DeployedJar { logger.error("Error when attempting to read jar contents: ", e); } - return new byte[0]; + return ZERO_BYTES; } /** @@ -387,8 +390,8 @@ public class DeployedJar { public URL getFileURL() { try { return this.file.toURL(); - } catch (IOException e) { - e.printStackTrace(); + } catch (MalformedURLException e) { + logger.warn(e); } return null; } @@ -397,7 +400,7 @@ public class DeployedJar { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((this.jarName == null) ? 0 : this.jarName.hashCode()); + result = prime * result + (this.jarName == null ? 0 : this.jarName.hashCode()); return result; } @@ -426,12 +429,12 @@ public class DeployedJar { @Override public String toString() { final StringBuilder sb = new StringBuilder(getClass().getName()); - sb.append("@").append(System.identityHashCode(this)).append("{"); + sb.append('@').append(System.identityHashCode(this)).append('{'); sb.append("jarName=").append(this.jarName); sb.append(",file=").append(this.file.getAbsolutePath()); sb.append(",md5hash=").append(Arrays.toString(this.md5hash)); sb.append(",version=").append(this.getVersion()); - sb.append("}"); + sb.append('}'); return sb.toString(); } }