Repository: geode Updated Branches: refs/heads/develop d2ddfd57a -> 017db36d2
GEODE-2257 Client configured to use locator with addPoolServer fails to connect My previous commit did not include the changes to TcpServer required for the test to pass. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/017db36d Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/017db36d Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/017db36d Branch: refs/heads/develop Commit: 017db36d2fc79b68b0a3faac70ad0f68fb0e33a3 Parents: d2ddfd5 Author: Bruce Schuchardt <[email protected]> Authored: Tue Jan 3 09:02:00 2017 -0800 Committer: Bruce Schuchardt <[email protected]> Committed: Tue Jan 3 09:02:00 2017 -0800 ---------------------------------------------------------------------- .../internal/tcpserver/TcpServer.java | 324 +++++++++---------- .../tier/sockets/ClientServerMiscDUnitTest.java | 2 - 2 files changed, 152 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/017db36d/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 83fdd0b..461d3ac 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -14,6 +14,30 @@ */ package org.apache.geode.distributed.internal.tcpserver; +import org.apache.geode.CancelException; +import org.apache.geode.DataSerializer; +import org.apache.geode.SystemFailure; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.distributed.internal.DistributionStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.PoolStatHelper; +import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; +import org.apache.geode.distributed.internal.SharedConfiguration; +import org.apache.geode.internal.DSFIDFactory; +import org.apache.geode.internal.GemFireVersion; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataOutputStream; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.sockets.HandShake; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.net.SocketCreatorFactory; +import org.apache.geode.internal.security.SecurableCommunicationChannel; +import org.apache.logging.log4j.Logger; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -37,29 +61,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.CancelException; -import org.apache.geode.DataSerializer; -import org.apache.geode.SystemFailure; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.DistributionConfigImpl; -import org.apache.geode.distributed.internal.DistributionStats; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.PoolStatHelper; -import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; -import org.apache.geode.distributed.internal.SharedConfiguration; -import org.apache.geode.internal.DSFIDFactory; -import org.apache.geode.internal.GemFireVersion; -import org.apache.geode.internal.Version; -import org.apache.geode.internal.VersionedDataInputStream; -import org.apache.geode.internal.VersionedDataOutputStream; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.net.SocketCreator; -import org.apache.geode.internal.net.SocketCreatorFactory; -import org.apache.geode.internal.security.SecurableCommunicationChannel; - /** * TCP server which listens on a port and delegates requests to a request handler. The server uses * expects messages containing a global version number, followed by a DataSerializable object @@ -113,13 +114,13 @@ public class TcpServer { private static final int BACKLOG = Integer .getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.BACKLOG", P2P_BACKLOG).intValue(); - // private int port=7500; private final int port; - private/* GemStoneAddition */ ServerSocket srv_sock = null; + private int serverSocketPortAtClose; + private ServerSocket srv_sock = null; private InetAddress bind_address; private volatile boolean shuttingDown = false; // GemStoneAddition private final PoolStatHelper poolHelper; - private/* GemStoneAddition */ final TcpHandler handler; + private final TcpHandler handler; private PooledExecutorWithDMStats executor; private final ThreadGroup threadGroup; @@ -262,14 +263,13 @@ public class TcpServer { * Returns the value of the bound port. If the server was initialized with a port of 0 indicating * that any ephemeral port should be used, this method will return the actual bound port. * - * @return the port bound to this socket or 0 if the socket is closed or otherwise not connected + * @return the locator's tcp/ip port. This will be zero if the locator hasn't been started. */ public int getPort() { if (srv_sock != null && !srv_sock.isClosed()) { return srv_sock.getLocalPort(); } - - return 0; + return serverSocketPortAtClose; } protected void run() { @@ -334,167 +334,147 @@ public class TcpServer { * synchronized in processGossip. */ private void processRequest(final Socket sock) { - Runnable clientTask = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - long startTime = DistributionStats.getStatTime(); - DataInputStream input = null; - Object request, response; + executor.execute(() -> { + long startTime = DistributionStats.getStatTime(); + DataInputStream input = null; + Object request, response; + try { + socketCreator.configureServerSSLSocket(sock); + sock.setSoTimeout(READ_TIMEOUT); try { - socketCreator.configureServerSSLSocket(sock); - // if(log.isInfoEnabled()) log.info("accepted connection from " + - // sock.getInetAddress() + - // ':' + sock.getPort()); - // sock.setSoLinger(true, 500); - sock.setSoTimeout(READ_TIMEOUT); - try { - input = new DataInputStream(sock.getInputStream()); - } catch (StreamCorruptedException e) { - // bug 36679: Some garbage can be left on the socket stream - // if a peer disappears at exactly the wrong moment. - log.debug("Discarding illegal request from " - + (sock.getInetAddress().getHostAddress() + ":" + sock.getPort()), e); - return; - } + input = new DataInputStream(sock.getInputStream()); + } catch (StreamCorruptedException e) { + // Some garbage can be left on the socket stream + // if a peer disappears at exactly the wrong moment. + log.debug("Discarding illegal request from " + + (sock.getInetAddress().getHostAddress() + ":" + sock.getPort()), e); + return; + } + + int gossipVersion = readGossipVersion(sock, input); - int gossipVersion = input.readInt(); - short versionOrdinal = Version.CURRENT_ORDINAL; + short versionOrdinal; + if (gossipVersion <= getCurrentGossipVersion() + && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { // Create a versioned stream to remember sender's GemFire version - if (gossipVersion <= getCurrentGossipVersion() - && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { - versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); - // if (gossipVersion < getCurrentGossipVersion()) { - // if (log.isTraceEnabled()) { - // log.debug( - // "Received request from " - // + sock.getInetAddress().getHostAddress() - // + " This locator is running: " + getCurrentGossipVersion() - // + ", but request was version: " + gossipVersion - // + ", version ordinal: " + versionOrdinal); - // } - // } - } + versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); + } else { // Close the socket. We can not accept requests from a newer version - else { - sock.close(); - return; - } - // Get exactly correct version of client from inputstream. - if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { - versionOrdinal = input.readShort(); - } - - if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { - log.debug("Locator reading request from " + sock.getInetAddress() + " with version " - + Version.fromOrdinal(versionOrdinal, false)); - } - input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); - request = DataSerializer.readObject(input); - if (log.isDebugEnabled()) { - log.debug("Locator received request " + request + " from " + sock.getInetAddress()); - } - if (request instanceof ShutdownRequest) { - shuttingDown = true; - // Don't call shutdown from within the worker thread, see java bug #6576792. This bug - // exists - // in the backport as well. Closing the socket will cause our acceptor thread to - // shutdown - // the executor - // executor.shutdown(); - srv_sock.close(); - response = new ShutdownResponse(); - } else if (request instanceof InfoRequest) { - response = handleInfoRequest(request); - } else if (request instanceof VersionRequest) { - response = handleVersionRequest(request); - } else { - response = handler.processRequest(request); - } + sock.close(); + return; + } + if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { + // Recent versions of TcpClient will send the version ordinal + versionOrdinal = input.readShort(); + } - handler.endRequest(request, startTime); + if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { + log.debug("Locator reading request from " + sock.getInetAddress() + " with version " + + Version.fromOrdinal(versionOrdinal, false)); + } + input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); + request = DataSerializer.readObject(input); + if (log.isDebugEnabled()) { + log.debug("Locator received request " + request + " from " + sock.getInetAddress()); + } + if (request instanceof ShutdownRequest) { + shuttingDown = true; + // Don't call shutdown from within the worker thread, see java bug #6576792. + // Closing the socket will cause our acceptor thread to shutdown the executor + this.serverSocketPortAtClose = srv_sock.getLocalPort(); + srv_sock.close(); + response = new ShutdownResponse(); + } else if (request instanceof InfoRequest) { + response = handleInfoRequest(request); + } else if (request instanceof VersionRequest) { + response = handleVersionRequest(request); + } else { + response = handler.processRequest(request); + } - startTime = DistributionStats.getStatTime(); - if (response != null) { - DataOutputStream output = new DataOutputStream(sock.getOutputStream()); - if (versionOrdinal != Version.CURRENT_ORDINAL) { - output = - new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); - } - DataSerializer.writeObject(response, output); + handler.endRequest(request, startTime); - output.flush(); + startTime = DistributionStats.getStatTime(); + if (response != null) { + DataOutputStream output = new DataOutputStream(sock.getOutputStream()); + if (versionOrdinal != Version.CURRENT_ORDINAL) { + output = + new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); } + DataSerializer.writeObject(response, output); + output.flush(); + } - handler.endResponse(request, startTime); + handler.endResponse(request, startTime); - } catch (EOFException ex) { - // client went away - ignore - } catch (CancelException ex) { - // ignore - } catch (ClassNotFoundException ex) { - String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); - } - log.info("Unable to process request from " + sender + " exception=" + ex.getMessage()); - } catch (Exception ex) { - String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); - } - if (ex instanceof IOException) { - // IOException could be caused by a client failure. Don't - // log with severe. - if (!sock.isClosed()) { - log.info("Exception in processing request from " + sender, ex); - } - } else { - log.fatal("Exception in processing request from " + sender, ex); + } catch (EOFException ex) { + // client went away - ignore + } catch (CancelException ex) { + // ignore + } catch (ClassNotFoundException ex) { + String sender = null; + if (sock != null) { + sender = sock.getInetAddress().getHostAddress(); + } + log.info("Unable to process request from " + sender + " exception=" + ex.getMessage()); + } catch (Exception ex) { + String sender = null; + if (sock != null) { + sender = sock.getInetAddress().getHostAddress(); + } + if (ex instanceof IOException) { + // IOException could be caused by a client failure. Don't + // log with severe. + if (!sock.isClosed()) { + log.info("Exception in processing request from " + sender, ex); } + } else { + log.fatal("Exception in processing request from " + sender, ex); + } + } catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + throw err; + } catch (Throwable ex) { + SystemFailure.checkFailure(); + String sender = null; + if (sock != null) { + sender = sock.getInetAddress().getHostAddress(); + } + try { + log.fatal("Exception in processing request from " + sender, ex); } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); - // If this ever returns, rethrow the error. We're poisoned - // now, so don't let this thread continue. throw err; - } catch (Throwable ex) { - // Whenever you catch Error or Throwable, you must also - // catch VirtualMachineError (see above). However, there is - // _still_ a possibility that you are dealing with a cascading - // error condition, so you also need to check to see if the JVM - // is still usable: + } catch (Throwable t) { SystemFailure.checkFailure(); - String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); - } - try { - log.fatal("Exception in processing request from " + sender, ex); - } catch (VirtualMachineError err) { - SystemFailure.initiateFailure(err); - // If this ever returns, rethrow the error. We're poisoned - // now, so don't let this thread continue. - throw err; - } catch (Throwable t) { - // Whenever you catch Error or Throwable, you must also - // catch VirtualMachineError (see above). However, there is - // _still_ a possibility that you are dealing with a cascading - // error condition, so you also need to check to see if the JVM - // is still usable: - SystemFailure.checkFailure(); - // for surviving and debugging exceptions getting the logger - t.printStackTrace(); - } - } finally { - try { - sock.close(); - } catch (IOException e) { - // ignore - } + t.printStackTrace(); + } + } finally { + try { + sock.close(); + } catch (IOException e) { + // ignore } } - }; - executor.execute(clientTask); + }); + } + + private int readGossipVersion(Socket sock, DataInputStream input) throws Exception { + // read the first byte & check for an improperly configured client pool trying + // to contact a cache server + int firstByte = input.readUnsignedByte(); + if (firstByte >= Acceptor.CLIENT_TO_SERVER) { + sock.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR); + throw new Exception("Improperly configured client detected - use addPoolLocator to " + + "configure its locators instead of addPoolServer."); + } + + int gossipVersion = firstByte; + for (int i = 0; i < 3; i++) { + gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte()); + } + return gossipVersion; } protected Object handleInfoRequest(Object request) { http://git-wip-us.apache.org/repos/asf/geode/blob/017db36d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java index 0be1471..391653c 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java @@ -26,7 +26,6 @@ import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.junit.categories.ClientServerTest; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -748,7 +747,6 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase { } } - @Ignore @Test(expected = GemFireConfigException.class) public void clientIsPreventedFromConnectingToLocatorAsServer() throws Exception { IgnoredException.addIgnoredException("Improperly configured client detected");
