Repository: geode
Updated Branches:
  refs/heads/develop d2ddfd57a -> 017db36d2


GEODE-2257 Client configured to use locator with addPoolServer fails to connect

My previous commit did not include the changes to TcpServer required for the
test to pass.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/017db36d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/017db36d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/017db36d

Branch: refs/heads/develop
Commit: 017db36d2fc79b68b0a3faac70ad0f68fb0e33a3
Parents: d2ddfd5
Author: Bruce Schuchardt <[email protected]>
Authored: Tue Jan 3 09:02:00 2017 -0800
Committer: Bruce Schuchardt <[email protected]>
Committed: Tue Jan 3 09:02:00 2017 -0800

----------------------------------------------------------------------
 .../internal/tcpserver/TcpServer.java           | 324 +++++++++----------
 .../tier/sockets/ClientServerMiscDUnitTest.java |   2 -
 2 files changed, 152 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/017db36d/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 83fdd0b..461d3ac 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -14,6 +14,30 @@
  */
 package org.apache.geode.distributed.internal.tcpserver;
 
+import org.apache.geode.CancelException;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.distributed.internal.DistributionStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.PoolStatHelper;
+import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
+import org.apache.geode.distributed.internal.SharedConfiguration;
+import org.apache.geode.internal.DSFIDFactory;
+import org.apache.geode.internal.GemFireVersion;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataOutputStream;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.sockets.HandShake;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -37,29 +61,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.DataSerializer;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
-import org.apache.geode.distributed.internal.DistributionStats;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.PoolStatHelper;
-import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
-import org.apache.geode.distributed.internal.SharedConfiguration;
-import org.apache.geode.internal.DSFIDFactory;
-import org.apache.geode.internal.GemFireVersion;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
-import org.apache.geode.internal.VersionedDataOutputStream;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.SocketCreator;
-import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
-
 /**
  * TCP server which listens on a port and delegates requests to a request 
handler. The server uses
  * expects messages containing a global version number, followed by a 
DataSerializable object
@@ -113,13 +114,13 @@ public class TcpServer {
   private static final int BACKLOG = Integer
       .getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.BACKLOG", 
P2P_BACKLOG).intValue();
 
-  // private int port=7500;
   private final int port;
-  private/* GemStoneAddition */ ServerSocket srv_sock = null;
+  private int serverSocketPortAtClose;
+  private ServerSocket srv_sock = null;
   private InetAddress bind_address;
   private volatile boolean shuttingDown = false; // GemStoneAddition
   private final PoolStatHelper poolHelper;
-  private/* GemStoneAddition */ final TcpHandler handler;
+  private final TcpHandler handler;
 
   private PooledExecutorWithDMStats executor;
   private final ThreadGroup threadGroup;
@@ -262,14 +263,13 @@ public class TcpServer {
    * Returns the value of the bound port. If the server was initialized with a 
port of 0 indicating
    * that any ephemeral port should be used, this method will return the 
actual bound port.
    * 
-   * @return the port bound to this socket or 0 if the socket is closed or 
otherwise not connected
+   * @return the locator's tcp/ip port. This will be zero if the locator 
hasn't been started.
    */
   public int getPort() {
     if (srv_sock != null && !srv_sock.isClosed()) {
       return srv_sock.getLocalPort();
     }
-
-    return 0;
+    return serverSocketPortAtClose;
   }
 
   protected void run() {
@@ -334,167 +334,147 @@ public class TcpServer {
    * synchronized in processGossip.
    */
   private void processRequest(final Socket sock) {
-    Runnable clientTask = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        long startTime = DistributionStats.getStatTime();
-        DataInputStream input = null;
-        Object request, response;
+    executor.execute(() -> {
+      long startTime = DistributionStats.getStatTime();
+      DataInputStream input = null;
+      Object request, response;
+      try {
+        socketCreator.configureServerSSLSocket(sock);
+        sock.setSoTimeout(READ_TIMEOUT);
         try {
-          socketCreator.configureServerSSLSocket(sock);
-          // if(log.isInfoEnabled()) log.info("accepted connection from " +
-          // sock.getInetAddress() +
-          // ':' + sock.getPort());
-          // sock.setSoLinger(true, 500);
-          sock.setSoTimeout(READ_TIMEOUT);
-          try {
-            input = new DataInputStream(sock.getInputStream());
-          } catch (StreamCorruptedException e) {
-            // bug 36679: Some garbage can be left on the socket stream
-            // if a peer disappears at exactly the wrong moment.
-            log.debug("Discarding illegal request from "
-                + (sock.getInetAddress().getHostAddress() + ":" + 
sock.getPort()), e);
-            return;
-          }
+          input = new DataInputStream(sock.getInputStream());
+        } catch (StreamCorruptedException e) {
+          // Some garbage can be left on the socket stream
+          // if a peer disappears at exactly the wrong moment.
+          log.debug("Discarding illegal request from "
+              + (sock.getInetAddress().getHostAddress() + ":" + 
sock.getPort()), e);
+          return;
+        }
+
+        int gossipVersion = readGossipVersion(sock, input);
 
-          int gossipVersion = input.readInt();
-          short versionOrdinal = Version.CURRENT_ORDINAL;
+        short versionOrdinal;
+        if (gossipVersion <= getCurrentGossipVersion()
+            && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
           // Create a versioned stream to remember sender's GemFire version
-          if (gossipVersion <= getCurrentGossipVersion()
-              && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
-            versionOrdinal = (short) 
GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
-            // if (gossipVersion < getCurrentGossipVersion()) {
-            // if (log.isTraceEnabled()) {
-            // log.debug(
-            // "Received request from "
-            // + sock.getInetAddress().getHostAddress()
-            // + " This locator is running: " + getCurrentGossipVersion()
-            // + ", but request was version: " + gossipVersion
-            // + ", version ordinal: " + versionOrdinal);
-            // }
-            // }
-          }
+          versionOrdinal = (short) 
GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
+        } else {
           // Close the socket. We can not accept requests from a newer version
-          else {
-            sock.close();
-            return;
-          }
-          // Get exactly correct version of client from inputstream.
-          if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
-            versionOrdinal = input.readShort();
-          }
-
-          if (log.isDebugEnabled() && versionOrdinal != 
Version.CURRENT_ORDINAL) {
-            log.debug("Locator reading request from " + sock.getInetAddress() 
+ " with version "
-                + Version.fromOrdinal(versionOrdinal, false));
-          }
-          input = new VersionedDataInputStream(input, 
Version.fromOrdinal(versionOrdinal, false));
-          request = DataSerializer.readObject(input);
-          if (log.isDebugEnabled()) {
-            log.debug("Locator received request " + request + " from " + 
sock.getInetAddress());
-          }
-          if (request instanceof ShutdownRequest) {
-            shuttingDown = true;
-            // Don't call shutdown from within the worker thread, see java bug 
#6576792. This bug
-            // exists
-            // in the backport as well. Closing the socket will cause our 
acceptor thread to
-            // shutdown
-            // the executor
-            // executor.shutdown();
-            srv_sock.close();
-            response = new ShutdownResponse();
-          } else if (request instanceof InfoRequest) {
-            response = handleInfoRequest(request);
-          } else if (request instanceof VersionRequest) {
-            response = handleVersionRequest(request);
-          } else {
-            response = handler.processRequest(request);
-          }
+          sock.close();
+          return;
+        }
+        if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
+          // Recent versions of TcpClient will send the version ordinal
+          versionOrdinal = input.readShort();
+        }
 
-          handler.endRequest(request, startTime);
+        if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) 
{
+          log.debug("Locator reading request from " + sock.getInetAddress() + 
" with version "
+              + Version.fromOrdinal(versionOrdinal, false));
+        }
+        input = new VersionedDataInputStream(input, 
Version.fromOrdinal(versionOrdinal, false));
+        request = DataSerializer.readObject(input);
+        if (log.isDebugEnabled()) {
+          log.debug("Locator received request " + request + " from " + 
sock.getInetAddress());
+        }
+        if (request instanceof ShutdownRequest) {
+          shuttingDown = true;
+          // Don't call shutdown from within the worker thread, see java bug 
#6576792.
+          // Closing the socket will cause our acceptor thread to shutdown the 
executor
+          this.serverSocketPortAtClose = srv_sock.getLocalPort();
+          srv_sock.close();
+          response = new ShutdownResponse();
+        } else if (request instanceof InfoRequest) {
+          response = handleInfoRequest(request);
+        } else if (request instanceof VersionRequest) {
+          response = handleVersionRequest(request);
+        } else {
+          response = handler.processRequest(request);
+        }
 
-          startTime = DistributionStats.getStatTime();
-          if (response != null) {
-            DataOutputStream output = new 
DataOutputStream(sock.getOutputStream());
-            if (versionOrdinal != Version.CURRENT_ORDINAL) {
-              output =
-                  new VersionedDataOutputStream(output, 
Version.fromOrdinal(versionOrdinal, false));
-            }
-            DataSerializer.writeObject(response, output);
+        handler.endRequest(request, startTime);
 
-            output.flush();
+        startTime = DistributionStats.getStatTime();
+        if (response != null) {
+          DataOutputStream output = new 
DataOutputStream(sock.getOutputStream());
+          if (versionOrdinal != Version.CURRENT_ORDINAL) {
+            output =
+                new VersionedDataOutputStream(output, 
Version.fromOrdinal(versionOrdinal, false));
           }
+          DataSerializer.writeObject(response, output);
+          output.flush();
+        }
 
-          handler.endResponse(request, startTime);
+        handler.endResponse(request, startTime);
 
-        } catch (EOFException ex) {
-          // client went away - ignore
-        } catch (CancelException ex) {
-          // ignore
-        } catch (ClassNotFoundException ex) {
-          String sender = null;
-          if (sock != null) {
-            sender = sock.getInetAddress().getHostAddress();
-          }
-          log.info("Unable to process request from " + sender + " exception=" 
+ ex.getMessage());
-        } catch (Exception ex) {
-          String sender = null;
-          if (sock != null) {
-            sender = sock.getInetAddress().getHostAddress();
-          }
-          if (ex instanceof IOException) {
-            // IOException could be caused by a client failure. Don't
-            // log with severe.
-            if (!sock.isClosed()) {
-              log.info("Exception in processing request from " + sender, ex);
-            }
-          } else {
-            log.fatal("Exception in processing request from " + sender, ex);
+      } catch (EOFException ex) {
+        // client went away - ignore
+      } catch (CancelException ex) {
+        // ignore
+      } catch (ClassNotFoundException ex) {
+        String sender = null;
+        if (sock != null) {
+          sender = sock.getInetAddress().getHostAddress();
+        }
+        log.info("Unable to process request from " + sender + " exception=" + 
ex.getMessage());
+      } catch (Exception ex) {
+        String sender = null;
+        if (sock != null) {
+          sender = sock.getInetAddress().getHostAddress();
+        }
+        if (ex instanceof IOException) {
+          // IOException could be caused by a client failure. Don't
+          // log with severe.
+          if (!sock.isClosed()) {
+            log.info("Exception in processing request from " + sender, ex);
           }
+        } else {
+          log.fatal("Exception in processing request from " + sender, ex);
+        }
 
+      } catch (VirtualMachineError err) {
+        SystemFailure.initiateFailure(err);
+        throw err;
+      } catch (Throwable ex) {
+        SystemFailure.checkFailure();
+        String sender = null;
+        if (sock != null) {
+          sender = sock.getInetAddress().getHostAddress();
+        }
+        try {
+          log.fatal("Exception in processing request from " + sender, ex);
         } catch (VirtualMachineError err) {
           SystemFailure.initiateFailure(err);
-          // If this ever returns, rethrow the error. We're poisoned
-          // now, so don't let this thread continue.
           throw err;
-        } catch (Throwable ex) {
-          // Whenever you catch Error or Throwable, you must also
-          // catch VirtualMachineError (see above). However, there is
-          // _still_ a possibility that you are dealing with a cascading
-          // error condition, so you also need to check to see if the JVM
-          // is still usable:
+        } catch (Throwable t) {
           SystemFailure.checkFailure();
-          String sender = null;
-          if (sock != null) {
-            sender = sock.getInetAddress().getHostAddress();
-          }
-          try {
-            log.fatal("Exception in processing request from " + sender, ex);
-          } catch (VirtualMachineError err) {
-            SystemFailure.initiateFailure(err);
-            // If this ever returns, rethrow the error. We're poisoned
-            // now, so don't let this thread continue.
-            throw err;
-          } catch (Throwable t) {
-            // Whenever you catch Error or Throwable, you must also
-            // catch VirtualMachineError (see above). However, there is
-            // _still_ a possibility that you are dealing with a cascading
-            // error condition, so you also need to check to see if the JVM
-            // is still usable:
-            SystemFailure.checkFailure();
-            // for surviving and debugging exceptions getting the logger
-            t.printStackTrace();
-          }
-        } finally {
-          try {
-            sock.close();
-          } catch (IOException e) {
-            // ignore
-          }
+          t.printStackTrace();
+        }
+      } finally {
+        try {
+          sock.close();
+        } catch (IOException e) {
+          // ignore
         }
       }
-    };
-    executor.execute(clientTask);
+    });
+  }
+
+  private int readGossipVersion(Socket sock, DataInputStream input) throws 
Exception {
+    // read the first byte & check for an improperly configured client pool 
trying
+    // to contact a cache server
+    int firstByte = input.readUnsignedByte();
+    if (firstByte >= Acceptor.CLIENT_TO_SERVER) {
+      sock.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR);
+      throw new Exception("Improperly configured client detected - use 
addPoolLocator to "
+          + "configure its locators instead of addPoolServer.");
+    }
+
+    int gossipVersion = firstByte;
+    for (int i = 0; i < 3; i++) {
+      gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte());
+    }
+    return gossipVersion;
   }
 
   protected Object handleInfoRequest(Object request) {

http://git-wip-us.apache.org/repos/asf/geode/blob/017db36d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index 0be1471..391653c 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -26,7 +26,6 @@ import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.junit.categories.ClientServerTest;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -748,7 +747,6 @@ public class ClientServerMiscDUnitTest extends 
JUnit4CacheTestCase {
     }
   }
 
-  @Ignore
   @Test(expected = GemFireConfigException.class)
   public void clientIsPreventedFromConnectingToLocatorAsServer() throws 
Exception {
     IgnoredException.addIgnoredException("Improperly configured client 
detected");

Reply via email to