GEODE-3075: changes in response to feedback; refactor some.

`AcceptorImpl.handleNewClientConnection` has had refactoring and should
be a bit more understandable.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c71c28df
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c71c28df
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c71c28df

Branch: refs/heads/feature/GEODE-3109
Commit: c71c28dff7fd6d20798fadcbd6a1a15f055ae52e
Parents: cdcc4d9
Author: Galen OSullivan <gosulli...@pivotal.io>
Authored: Mon Jun 19 17:48:53 2017 -0700
Committer: Hitesh Khamesra <hkhame...@pivotal.io>
Committed: Mon Jun 26 09:26:22 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/tier/Acceptor.java     |   2 +-
 .../cache/tier/sockets/AcceptorImpl.java        | 316 ++++++++++---------
 .../GenericProtocolServerConnection.java        |  85 +++++
 .../sockets/NewProtocolServerConnection.java    |  87 -----
 .../tier/sockets/ServerConnectionFactory.java   |   9 +-
 .../ServerConnectionFactoryIntegrationTest.java |  45 ++-
 .../sockets/ServerConnectionFactoryTest.java    |  15 +-
 7 files changed, 291 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index a95195a..e12a409 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -74,7 +74,7 @@ public abstract class Acceptor {
   /**
    * For the new client-server protocol, which ignores the usual handshake 
mechanism.
    */
-  public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110;
+  public static final byte PROTOBUF_CLIENT_SERVER_PROTOCOL = (byte) 110;
 
   /**
    * The GFE version of the server.

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 24efc93..50f7006 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -94,7 +94,7 @@ import javax.net.ssl.SSLException;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections 
from the edge and starts
  * up threads to process requests from these.
- * 
+ *
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -113,19 +113,29 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   private final ThreadPoolExecutor hsPool;
 
-  /** The port on which this acceptor listens for client connections */
+  /**
+   * The port on which this acceptor listens for client connections
+   */
   private final int localPort;
 
-  /** The server socket that handles requests for connections */
+  /**
+   * The server socket that handles requests for connections
+   */
   private ServerSocket serverSock = null;
 
-  /** The GemFire cache served up by this acceptor */
+  /**
+   * The GemFire cache served up by this acceptor
+   */
   protected final InternalCache cache;
 
-  /** Caches region information */
+  /**
+   * Caches region information
+   */
   private final CachedRegionHelper crHelper;
 
-  /** A lock to prevent close from occurring while creating a ServerConnection 
*/
+  /**
+   * A lock to prevent close from occurring while creating a ServerConnection
+   */
   private final Object syncLock = new Object();
 
   /**
@@ -165,7 +175,9 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   public static final int DEFAULT_HANDSHAKE_TIMEOUT_MS = 59000;
 
-  /** Test value for handshake timeout */
+  /**
+   * Test value for handshake timeout
+   */
   protected static final int handShakeTimeout =
       Integer.getInteger(HANDSHAKE_TIMEOUT_PROPERTY_NAME, 
DEFAULT_HANDSHAKE_TIMEOUT_MS).intValue();
 
@@ -180,17 +192,25 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   public static final int DEFAULT_ACCEPT_TIMEOUT_MS = 9900;
 
-  /** Test value for accept timeout */
+  /**
+   * Test value for accept timeout
+   */
   private final int acceptTimeout =
       Integer.getInteger(ACCEPT_TIMEOUT_PROPERTY_NAME, 
DEFAULT_ACCEPT_TIMEOUT_MS).intValue();
 
-  /** The mininum value of max-connections */
+  /**
+   * The mininum value of max-connections
+   */
   public static final int MINIMUM_MAX_CONNECTIONS = 16;
 
-  /** The buffer size for server-side sockets. */
+  /**
+   * The buffer size for server-side sockets.
+   */
   private final int socketBufferSize;
 
-  /** Notifies clients of updates */
+  /**
+   * Notifies clients of updates
+   */
   private CacheClientNotifier clientNotifier;
 
   /**
@@ -198,7 +218,9 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   private static final int DEFAULT_BACKLOG = 1000;
 
-  /** The system property name for setting the {@link ServerSocket}backlog */
+  /**
+   * The system property name for setting the {@link ServerSocket}backlog
+   */
   public static final String BACKLOG_PROPERTY_NAME = "BridgeServer.backlog";
 
   /**
@@ -206,13 +228,19 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   public final AtomicInteger clientServerCnxCount = new AtomicInteger();
 
-  /** Has this acceptor been shut down */
+  /**
+   * Has this acceptor been shut down
+   */
   private volatile boolean shutdownStarted = false;
 
-  /** The thread that runs the acceptor */
+  /**
+   * The thread that runs the acceptor
+   */
   private Thread thread = null;
 
-  /** The thread that runs the selector loop if any */
+  /**
+   * The thread that runs the selector loop if any
+   */
   private Thread selectorThread = null;
 
   /**
@@ -222,16 +250,16 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   /**
    * List of ServerConnection.
-   * 
+   *
    * Instances added when constructed; removed when terminated.
-   * 
+   *
    * guarded.By {@link #allSCsLock}
    */
   private final HashSet allSCs = new HashSet();
 
   /**
    * List of ServerConnections, for {@link #emergencyClose()}
-   * 
+   *
    * guarded.By {@link #allSCsLock}
    */
   private volatile ServerConnection allSCList[] = new ServerConnection[0];
@@ -239,7 +267,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   /**
    * The ip address or host name this acceptor is to bind to; 
<code>null</code> or "" indicates it
    * will listen on all local addresses.
-   * 
+   *
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -249,10 +277,14 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    */
   private final ConnectionListener connectionListener;
 
-  /** The client health monitor tracking connections for this acceptor */
+  /**
+   * The client health monitor tracking connections for this acceptor
+   */
   private ClientHealthMonitor healthMonitor;
 
-  /** bridge's setting of notifyBySubscription */
+  /**
+   * bridge's setting of notifyBySubscription
+   */
   private final boolean notifyBySubscription;
 
   /**
@@ -273,7 +305,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given 
port.
-   * 
+   *
    * @param port The port on which this acceptor listens for connections. If 
<code>0</code>, a
    *        random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on 
for connections. If
@@ -284,7 +316,6 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the 
server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
-   * 
    * @see SocketCreator#createServerSocket(int, int, InetAddress)
    * @see ClientHealthMonitor
    * @since GemFire 5.7
@@ -615,7 +646,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 
backwards
    * compatibility.
-   * 
+   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -624,7 +655,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 
backwards
    * compatibility.
-   * 
+   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -693,7 +724,9 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
     wakeupSelector();
   }
 
-  /** wake up the selector thread */
+  /**
+   * wake up the selector thread
+   */
   private void wakeupSelector() {
     Selector s = getSelector();
     if (s != null && s.isOpen()) {
@@ -760,7 +793,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get 
loaded.
-   * 
+   *
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1146,7 +1179,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   /**
    * The work loop of this acceptor
-   * 
+   *
    * @see #accept
    */
   public void run() {
@@ -1369,156 +1402,145 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
     return this.clientServerCnxCount.get();
   }
 
-  protected void handleNewClientConnection(final Socket s) throws IOException {
+  protected void handleNewClientConnection(final Socket socket) throws 
IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
     // for processing.
     byte communicationMode;
     if (isSelector()) {
-      ByteBuffer bb = ByteBuffer.allocateDirect(1);
-      final SocketChannel sc = s.getChannel();
-      sc.configureBlocking(false);
+      ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1);
+      final SocketChannel socketChannel = socket.getChannel();
+      socketChannel.configureBlocking(false);
       // try to read the byte first in non-blocking mode
-      int res = sc.read(bb);
-      sc.configureBlocking(true);
+      int res = socketChannel.read(byteBuffer);
+      socketChannel.configureBlocking(true);
       if (res < 0) {
         throw new EOFException();
       } else if (res == 0) {
         // now do a blocking read so setup a timer to close the socket if the
         // the read takes too long
-        SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() {
+        SystemTimer.SystemTimerTask timerTask = new 
SystemTimer.SystemTimerTask() {
           @Override
           public void run2() {
             logger.warn(LocalizedMessage.create(
                 
LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0,
-                s.getRemoteSocketAddress()));
-            closeSocket(s);
+                socket.getRemoteSocketAddress()));
+            closeSocket(socket);
           }
         };
-        this.hsTimer.schedule(st, this.acceptTimeout);
-        res = sc.read(bb);
-        if ((!st.cancel()) || res <= 0) {
+        this.hsTimer.schedule(timerTask, this.acceptTimeout);
+        res = socketChannel.read(byteBuffer);
+        if ((!timerTask.cancel()) || res <= 0) {
           throw new EOFException();
         }
       }
-      communicationMode = bb.get(0);
-      if (logger.isTraceEnabled()) {
-        logger.trace("read communications mode(1) ", communicationMode);
-      }
+      communicationMode = byteBuffer.get(0);
     } else {
-      s.setSoTimeout(this.acceptTimeout);
-      this.socketCreator.configureServerSSLSocket(s);
-      communicationMode = (byte) s.getInputStream().read();
-      if (logger.isTraceEnabled()) {
-        logger.trace("read communications mode(2) ", communicationMode);
-      }
+      socket.setSoTimeout(this.acceptTimeout);
+      this.socketCreator.configureServerSSLSocket(socket);
+      communicationMode = (byte) socket.getInputStream().read();
       if (communicationMode == -1) {
         throw new EOFException();
       }
-      s.setSoTimeout(0);
+      socket.setSoTimeout(0);
     }
 
-    s.setTcpNoDelay(this.tcpNoDelay);
+    socket.setTcpNoDelay(this.tcpNoDelay);
 
-    if (communicationMode == CLIENT_TO_SERVER || communicationMode == 
GATEWAY_TO_GATEWAY
-        || communicationMode == MONITOR_TO_SERVER || communicationMode == 
CLIENT_TO_SERVER_FOR_QUEUE
-        || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) {
-      String communicationModeStr = "";
-      switch (communicationMode) {
-        case CLIENT_TO_SERVER:
-          communicationModeStr = "client";
-          break;
-        case GATEWAY_TO_GATEWAY:
-          communicationModeStr = "gateway";
-          break;
-        case MONITOR_TO_SERVER:
-          communicationModeStr = "monitor";
-          break;
-        case CLIENT_TO_SERVER_FOR_QUEUE:
-          communicationModeStr = "clientToServerForQueue";
-          break;
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("Bridge server: Initializing {} communication socket: {}",
-            communicationModeStr, s);
+    String communicationModeStr;
+    switch (communicationMode) {
+      default:
+        throw new IOException("Acceptor received unknown communication mode: " 
+ communicationMode);
+
+      case PRIMARY_SERVER_TO_CLIENT:
+        logger.debug(
+            ":Bridge server: Initializing primary server-to-client 
communication socket: {}",
+            socket);
+        AcceptorImpl.this.clientNotifier.registerClient(socket, true, 
this.acceptorId,
+            this.notifyBySubscription);
+        return;
+
+      case SECONDARY_SERVER_TO_CLIENT:
+        logger.debug(
+            ":Bridge server: Initializing secondary server-to-client 
communication socket: {}",
+            socket);
+        AcceptorImpl.this.clientNotifier.registerClient(socket, false, 
this.acceptorId,
+            this.notifyBySubscription);
+        return;
+
+      case CLIENT_TO_SERVER:
+        communicationModeStr = "client";
+        break;
+      case GATEWAY_TO_GATEWAY:
+        communicationModeStr = "gateway";
+        break;
+      case MONITOR_TO_SERVER:
+        communicationModeStr = "monitor";
+        break;
+      case CLIENT_TO_SERVER_FOR_QUEUE:
+        communicationModeStr = "clientToServerForQueue";
+        break;
+      case PROTOBUF_CLIENT_SERVER_PROTOCOL:
+        communicationModeStr = "Protobuf client";
+        break;
+    }
+
+    logger.debug("Bridge server: Initializing {} communication socket: {}", 
communicationModeStr,
+        socket);
+    if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) {
+      int curCnt = this.getClientServerCnxCount();
+      if (curCnt >= this.maxConnections) {
+        logger.warn(LocalizedMessage.create(
+            
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
+            new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
+                Integer.valueOf(this.maxConnections)}));
+        try {
+          ServerHandShakeProcessor.refuse(socket.getOutputStream(),
+              LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
+                  .toLocalizedString(Integer.valueOf(this.maxConnections)));
+        } catch (Exception ex) {
+          logger.debug("rejection message failed", ex);
+        }
+        closeSocket(socket);
+        return;
       }
-      if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) {
-        int curCnt = this.getClientServerCnxCount();
-        if (curCnt >= this.maxConnections) {
-          logger.warn(LocalizedMessage.create(
-              
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
-              new Object[] {s.getInetAddress(), Integer.valueOf(curCnt),
-                  Integer.valueOf(this.maxConnections)}));
-          // if (s != null) (cannot be null)
-          {
-            try {
-              ServerHandShakeProcessor.refuse(s.getOutputStream(),
-                  LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
-                      
.toLocalizedString(Integer.valueOf(this.maxConnections)));
-            } catch (Exception ex) {
-              if (logger.isDebugEnabled()) {
-                logger.debug("rejection message failed", ex);
-              }
-            }
-            closeSocket(s);
-          }
+    }
+
+    ServerConnection serverConn = 
ServerConnectionFactory.makeServerConnection(socket, this.cache,
+        this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, 
this.socketBufferSize,
+        communicationModeStr, communicationMode, this, this.securityService);
+
+    synchronized (this.allSCsLock) {
+      this.allSCs.add(serverConn);
+      ServerConnection snap[] = this.allSCList; // avoid volatile read
+      this.allSCList = (ServerConnection[]) ArrayUtils.insert(snap, 
snap.length, serverConn);
+    }
+    if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) {
+      incClientServerCnxCount();
+    }
+    if (isSelector()) {
+      serverConn.registerWithSelector();
+    } else {
+      try {
+        pool.execute(serverConn);
+      } catch (RejectedExecutionException rejected) {
+        if (!isRunning()) {
           return;
         }
-      }
-      ServerConnection serverConn = 
ServerConnectionFactory.makeServerConnection(s, this.cache,
-          this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, 
this.socketBufferSize,
-          communicationModeStr, communicationMode, this, this.securityService);
-      synchronized (this.allSCsLock) {
-        this.allSCs.add(serverConn);
-        ServerConnection snap[] = this.allSCList; // avoid volatile read
-        this.allSCList = (ServerConnection[]) ArrayUtils.insert(snap, 
snap.length, serverConn);
-      }
-      if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) {
-        incClientServerCnxCount();
-      }
-      if (isSelector()) {
-        serverConn.registerWithSelector();
-      } else {
+        logger.warn(LocalizedMessage.create(
+            
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
+            new Object[] {serverConn}));
         try {
-          pool.execute(serverConn);
-        } catch (RejectedExecutionException rejected) {
-          if (!isRunning()) {
-            return;
-          }
-          logger.warn(LocalizedMessage.create(
-              
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
-              new Object[] {serverConn}));
-          try {
-            ServerHandShakeProcessor.refuse(s.getOutputStream(),
-                LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
-                    .toLocalizedString(Integer.valueOf(this.maxConnections)));
+          ServerHandShakeProcessor.refuse(socket.getOutputStream(),
+              LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
+                  .toLocalizedString(Integer.valueOf(this.maxConnections)));
 
-          } catch (Exception ex) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("rejection message failed", ex);
-            }
-          }
-          serverConn.cleanup();
+        } catch (Exception ex) {
+          logger.debug("rejection message failed", ex);
         }
+        serverConn.cleanup();
       }
-    } else if (communicationMode == PRIMARY_SERVER_TO_CLIENT) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            ":Bridge server: Initializing primary server-to-client 
communication socket: {}", s);
-      }
-      // try {
-      AcceptorImpl.this.clientNotifier.registerClient(s, true, this.acceptorId,
-          this.notifyBySubscription);
-    } else if (communicationMode == SECONDARY_SERVER_TO_CLIENT) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            ":Bridge server: Initializing secondary server-to-client 
communication socket: {}", s);
-      }
-      AcceptorImpl.this.clientNotifier.registerClient(s, false, 
this.acceptorId,
-          this.notifyBySubscription);
-    } else {
-      throw new IOException("Acceptor received unknown communication mode: " + 
communicationMode);
     }
   }
 
@@ -1645,7 +1667,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" 
if all local addresses
    *         will be listened to.
-   * 
+   *
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1682,7 +1704,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   /**
    * Gets the address that this bridge server can be contacted on from 
external processes.
-   * 
+   *
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1716,7 +1738,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   /**
    * This method finds a client notifier and returns it. It is used to 
propagate interest
    * registrations to other servers
-   * 
+   *
    * @return the instance that provides client notification
    */
   public CacheClientNotifier getCacheClientNotifier() {
@@ -1773,7 +1795,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
    * This method returns a thread safe structure which can be iterated over 
without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over 
this list to get
    * client info.
-   * 
+   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
new file mode 100644
index 0000000..8edd83c
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+/**
+ * Holds the socket and protocol handler for the new client protocol. TODO: 
Currently unimplemented
+ * due the the protocol not being there.
+ */
+public class GenericProtocolServerConnection extends ServerConnection {
+  // The new protocol lives in a separate module and gets loaded when this 
class is instantiated.
+  // TODO implement this.
+  private final ClientProtocolMessageHandler messageHandler;
+
+  /**
+   * Creates a new <code>GenericProtocolServerConnection</code> that processes 
messages received
+   * from an edge client over a given <code>Socket</code>.
+   *
+   * @param s
+   * @param c
+   * @param helper
+   * @param stats
+   * @param hsTimeout
+   * @param socketBufferSize
+   * @param communicationModeStr
+   * @param communicationMode
+   * @param acceptor
+   */
+  public GenericProtocolServerConnection(Socket s, InternalCache c, 
CachedRegionHelper helper,
+      CacheServerStats stats, int hsTimeout, int socketBufferSize, String 
communicationModeStr,
+      byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler 
newClientProtocol,
+      SecurityService securityService) {
+    super(s, c, helper, stats, hsTimeout, socketBufferSize, 
communicationModeStr, communicationMode,
+        acceptor, securityService);
+    this.messageHandler = newClientProtocol;
+  }
+
+  @Override
+  protected void doOneMessage() {
+    try {
+      Socket socket = this.getSocket();
+      InputStream inputStream = socket.getInputStream();
+      OutputStream outputStream = socket.getOutputStream();
+
+      // TODO serialization types?
+      messageHandler.receiveMessage(inputStream, outputStream, 
this.getCache());
+    } catch (IOException e) {
+      // TODO?
+    }
+    return;
+  }
+
+  @Override
+  protected boolean doHandShake(byte epType, int qSize) {
+    // no handshake for new client protocol.
+    return true;
+  }
+
+  @Override
+  public boolean isClientServerConnection() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
deleted file mode 100644
index 83b23b1..0000000
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.cache.tier.sockets;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
-import org.apache.geode.internal.security.SecurityService;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-
-/**
- * Holds the socket and protocol handler for the new client protocol. TODO: 
Currently unimplemented
- * due the the protocol not being there.
- */
-public class NewProtocolServerConnection extends ServerConnection {
-  // The new protocol lives in a separate module and gets loaded when this 
class is instantiated.
-  // TODO implement this.
-  private final ClientProtocolMessageHandler newClientProtocol;
-
-
-  /**
-   * Creates a new <code>NewProtocolServerConnection</code> that processes 
messages received from an
-   * edge client over a given <code>Socket</code>.
-   *
-   * @param s
-   * @param c
-   * @param helper
-   * @param stats
-   * @param hsTimeout
-   * @param socketBufferSize
-   * @param communicationModeStr
-   * @param communicationMode
-   * @param acceptor
-   */
-  public NewProtocolServerConnection(Socket s, InternalCache c, 
CachedRegionHelper helper,
-      CacheServerStats stats, int hsTimeout, int socketBufferSize, String 
communicationModeStr,
-      byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler 
newClientProtocol,
-      SecurityService securityService) {
-    super(s, c, helper, stats, hsTimeout, socketBufferSize, 
communicationModeStr, communicationMode,
-        acceptor, securityService);
-    assert (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL);
-    this.newClientProtocol = newClientProtocol;
-  }
-
-  @Override
-  protected void doOneMessage() {
-    try {
-      Socket socket = this.getSocket();
-      InputStream inputStream = socket.getInputStream();
-      OutputStream outputStream = socket.getOutputStream();
-
-      // TODO serialization types?
-      newClientProtocol.receiveMessage(inputStream, outputStream, 
this.getCache());
-    } catch (IOException e) {
-      // TODO?
-    }
-    return;
-  }
-
-  @Override
-  protected boolean doHandShake(byte epType, int qSize) {
-    // no handshake for new client protocol.
-    return true;
-  }
-
-  @Override
-  public boolean isClientServerConnection() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 4f2e304..e4746a7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -28,19 +28,20 @@ import java.net.Socket;
  */
 public class ServerConnectionFactory {
   // TODO: implement ClientProtocolMessageHandler.
-  private static final ClientProtocolMessageHandler newClientProtocol =
+  private static final ClientProtocolMessageHandler protobufProtocolHandler =
       new ClientProtocolMessageHandler();
 
   public static ServerConnection makeServerConnection(Socket s, InternalCache 
c,
       CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int 
socketBufferSize,
       String communicationModeStr, byte communicationMode, Acceptor acceptor,
       SecurityService securityService) throws IOException {
-    if (communicationMode == Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+    if (communicationMode == Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL) {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Acceptor received unknown communication mode: " 
+ communicationMode);
       } else {
-        return new NewProtocolServerConnection(s, c, helper, stats, hsTimeout, 
socketBufferSize,
-            communicationModeStr, communicationMode, acceptor, 
newClientProtocol, securityService);
+        return new GenericProtocolServerConnection(s, c, helper, stats, 
hsTimeout, socketBufferSize,
+            communicationModeStr, communicationMode, acceptor, 
protobufProtocolHandler,
+            securityService);
       }
     } else {
       return new LegacyServerConnection(s, c, helper, stats, hsTimeout, 
socketBufferSize,

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java
index 007d1d6..82c4e54 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java
@@ -15,6 +15,8 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
@@ -22,45 +24,42 @@ import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.awaitility.Awaitility;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.Socket;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-
 /**
- * Test that switching on the header byte makes instances of {@link 
NewProtocolServerConnection}.
+ * Test that switching on the header byte makes instances of
+ * {@link GenericProtocolServerConnection}.
  */
 @Category(IntegrationTest.class)
 public class ServerConnectionFactoryIntegrationTest {
-  /**
-   *
-   * @throws IOException
-   */
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
   @Test
   public void testNewProtocolHeaderLeadsToNewProtocolServerConnection() throws 
IOException {
     System.setProperty("geode.feature-protobuf-protocol", "true");
-    try {
-      CacheFactory cacheFactory = new CacheFactory();
-      Cache cache = cacheFactory.create();
+    CacheFactory cacheFactory = new CacheFactory();
+    Cache cache = cacheFactory.create();
 
-      CacheServer cacheServer = cache.addCacheServer();
-      final int cacheServerPort = 
AvailablePortHelper.getRandomAvailableTCPPort();
-      cacheServer.setPort(cacheServerPort);
-      cacheServer.start();
+    CacheServer cacheServer = cache.addCacheServer();
+    final int cacheServerPort = 
AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
 
-      Socket socket = new Socket("localhost", cacheServerPort);
-      Awaitility.await().atMost(5, 
TimeUnit.SECONDS).until(socket::isConnected);
-      socket.getOutputStream().write(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL);
-      socket.getOutputStream().write(222);
-      assertEquals(222, socket.getInputStream().read());
+    Socket socket = new Socket("localhost", cacheServerPort);
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+    socket.getOutputStream().write(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
+    socket.getOutputStream().write(222);
+    assertEquals(222, socket.getInputStream().read());
 
-      cache.close();
-    } finally {
-      System.clearProperty("geode.feature-protobuf-protocol");
-    }
+    cache.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 8e241b2..11b5289 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -39,16 +39,19 @@ public class ServerConnectionFactoryTest {
    */
   @Test(expected = IOException.class)
   public void newClientProtocolThrows() throws Exception {
-    
serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL);
+    
serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
   }
 
   @Test
   public void newClientProtocolSucceedsWithSystemPropertySet() throws 
Exception {
-    System.setProperty("geode.feature-protobuf-protocol", "true");
-    ServerConnection serverConnection =
-        
serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL);
-    assertTrue(serverConnection instanceof NewProtocolServerConnection);
-    System.clearProperty("geode.feature-protobuf-protocol");
+    try {
+      System.setProperty("geode.feature-protobuf-protocol", "true");
+      ServerConnection serverConnection = 
serverConnectionMockedExceptForCommunicationMode(
+          Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
+      assertTrue(serverConnection instanceof GenericProtocolServerConnection);
+    } finally {
+      System.clearProperty("geode.feature-protobuf-protocol");
+    }
   }
 
   @Test

Reply via email to