[ 
https://issues.apache.org/jira/browse/GEODE-3895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236671#comment-16236671
 ] 

ASF GitHub Bot commented on GEODE-3895:
---------------------------------------

galen-pivotal closed pull request #1001: GEODE-3895: Add Handshake/Message 
version byte
URL: https://github.com/apache/geode/pull/1001
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index a673c036b1..e6a797c3dc 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -63,7 +63,8 @@
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
 import 
org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
-import 
org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -385,8 +386,9 @@ private void processRequest(final Socket socket) {
           if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
               && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
             try {
+              int protocolVersion = input.readUnsignedByte();
               ClientProtocolService clientProtocolService =
-                  clientProtocolServiceLoader.lookupService();
+                  clientProtocolServiceLoader.lookupService(protocolVersion);
               clientProtocolService.initializeStatistics("LocatorStats",
                   internalLocator.getDistributedSystem());
               try (ClientProtocolProcessor pipeline =
@@ -400,6 +402,11 @@ private void processRequest(final Socket socket) {
               log.error("There was an error looking up the client protocol 
service", e);
               socket.close();
               throw new IOException("There was an error looking up the client 
protocol service", e);
+            } catch (ServiceVersionNotFoundException e) {
+              log.error("Unable to find service matching the client protocol 
version byte", e);
+              socket.close();
+              throw new IOException(
+                  "Unable to find service matching the client protocol version 
byte", e);
             }
           } else {
             rejectUnknownProtocolConnection(socket, gossipVersion);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
index 7f50c9fbe6..b86d76a5bf 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
@@ -38,4 +38,6 @@
    * Create a locator processor. The locator does not currently provide any 
authentication.
    */
   ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
+
+  int getServiceProtocolVersion();
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
index c7ba6e0dce..4b66062ff0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
@@ -19,7 +19,8 @@
 import java.util.List;
 import java.util.ServiceLoader;
 
-import 
org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
 
 public class ClientProtocolServiceLoader {
   private final List<ClientProtocolService> clientProtocolServices;
@@ -38,7 +39,7 @@ public ClientProtocolServiceLoader() {
     return resultList;
   }
 
-  public ClientProtocolService lookupService() {
+  public ClientProtocolService lookupService(int protocolVersion) {
     if (clientProtocolServices.isEmpty()) {
       throw new ServiceLoadingFailureException(
           "There is no ClientProtocolService implementation found in JVM");
@@ -48,6 +49,11 @@ public ClientProtocolService lookupService() {
       throw new ServiceLoadingFailureException(
           "There is more than one ClientProtocolService implementation found 
in JVM; aborting");
     }
-    return clientProtocolServices.get(0);
+    ClientProtocolService clientProtocolService = 
clientProtocolServices.get(0);
+    if (clientProtocolService.getServiceProtocolVersion() != protocolVersion) {
+      throw new ServiceVersionNotFoundException(
+          "The ClientProtocolService doesn't match the requested version.");
+    }
+    return clientProtocolService;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
similarity index 94%
rename from 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
rename to 
geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
index be39672067..2c448456f3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
@@ -13,7 +13,7 @@
  * the License.
  */
 
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.cache.client.protocol.exception;
 
 import org.apache.geode.GemFireException;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
new file mode 100644
index 0000000000..d6af72712e
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.client.protocol.exception;
+
+import org.apache.geode.GemFireException;
+
+/**
+ * Indicates that no service is found for the given service version.
+ */
+public class ServiceVersionNotFoundException extends GemFireException {
+  public ServiceVersionNotFoundException(String message) {
+    super(message);
+  }
+
+  public ServiceVersionNotFoundException(Exception cause) {
+    super(cause);
+  }
+
+  public ServiceVersionNotFoundException(String message, Exception cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index a6fc973d02..7070b3799c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -25,6 +25,8 @@
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
 import 
org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.security.SecurityService;
@@ -42,9 +44,9 @@ public ServerConnectionFactory() {
 
 
   private synchronized ClientProtocolService getClientProtocolService(
-      StatisticsFactory statisticsFactory, String serverName) {
+      StatisticsFactory statisticsFactory, String serverName, int 
protocolVersion) {
     if (clientProtocolService == null) {
-      clientProtocolService = clientProtocolServiceLoader.lookupService();
+      clientProtocolService = 
clientProtocolServiceLoader.lookupService(protocolVersion);
       clientProtocolService.initializeStatistics(serverName, 
statisticsFactory);
     }
     return clientProtocolService;
@@ -58,11 +60,15 @@ public ServerConnection makeServerConnection(Socket socket, 
InternalCache cache,
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Server received unknown communication mode: " + 
communicationMode);
       } else {
+        int protocolVersion = readProtocolVersionByte(socket);
         try {
           return createGenericProtocolServerConnection(socket, cache, helper, 
stats, hsTimeout,
-              socketBufferSize, communicationModeStr, communicationMode, 
acceptor, securityService);
+              socketBufferSize, communicationModeStr, communicationMode, 
acceptor, securityService,
+              protocolVersion);
         } catch (ServiceLoadingFailureException ex) {
           throw new IOException("Could not load protobuf client protocol", ex);
+        } catch (ServiceVersionNotFoundException ex) {
+          throw new IOException("No service matching provided version byte", 
ex);
         }
       }
     } else {
@@ -71,12 +77,16 @@ public ServerConnection makeServerConnection(Socket socket, 
InternalCache cache,
     }
   }
 
+  private int readProtocolVersionByte(Socket socket) throws IOException {
+    return socket.getInputStream().read();
+  }
+
   private ServerConnection createGenericProtocolServerConnection(Socket 
socket, InternalCache cache,
       CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int 
socketBufferSize,
       String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService) {
-    ClientProtocolService service =
-        getClientProtocolService(cache.getDistributedSystem(), 
acceptor.getServerName());
+      SecurityService securityService, int protocolVersion) {
+    ClientProtocolService service = 
getClientProtocolService(cache.getDistributedSystem(),
+        acceptor.getServerName(), protocolVersion);
 
     ClientProtocolProcessor processor = service.createProcessorForCache(cache, 
securityService);
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index a3f34b080e..e11b206564 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -16,6 +16,7 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import org.apache.geode.internal.cache.InternalCache;
+import 
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.security.SecurityService;
@@ -28,6 +29,7 @@
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 
@@ -108,6 +110,9 @@ private ServerConnection 
serverConnectionMockedExceptForCommunicationMode(byte c
       throws IOException {
     Socket socketMock = mock(Socket.class);
     
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
+    InputStream streamMock = mock(InputStream.class);
+    when(streamMock.read()).thenReturn(1);
+    when(socketMock.getInputStream()).thenReturn(streamMock);
 
     return new ServerConnectionFactory().makeServerConnection(socketMock, 
mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, 
"", communicationMode,
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
index c87398fa80..669787d6df 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
@@ -59,4 +59,9 @@ ProtocolClientStatistics getStatistics() {
   public ClientProtocolProcessor createProcessorForLocator(InternalLocator 
locator) {
     return new ProtobufLocatorPipeline(protobufStreamProcessor, 
getStatistics(), locator);
   }
+
+  @Override
+  public int getServiceProtocolVersion() {
+    return ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
+  }
 }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
index 1e632ca607..2977c5ae9e 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
@@ -20,6 +20,7 @@
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
@@ -88,7 +89,8 @@ public void setupCacheServerAndSocket() throws Exception {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     inputStream = socket.getInputStream();
-    outputStream.write(110);
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
index 8f2390b997..4c0cd5286b 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
@@ -18,6 +18,7 @@
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import 
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
@@ -107,7 +108,8 @@ public void setUp() throws IOException, 
InvalidProtocolMessageException {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     inputStream = socket.getInputStream();
-    outputStream.write(110);
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
new file mode 100644
index 0000000000..d3b78fd34c
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class HandshakeIntegrationTest {
+  private Cache cache;
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+  private OutputStream outputStream;
+  private InputStream inputStream;
+  private ProtobufProtocolSerializer protobufProtocolSerializer;
+  private Socket socket;
+  private SocketChannel socketChannel;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+
+    // Create a cache with security disabled
+    Properties properties = new Properties();
+    CacheFactory cacheFactory = new CacheFactory(properties);
+    cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+    cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, 
"false");
+    cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, 
"false");
+    cache = cacheFactory.create();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+    InetSocketAddress localhost = new InetSocketAddress("localhost", 
cacheServerPort);
+    socketChannel = SocketChannel.open(localhost);
+
+    socket = socketChannel.socket();
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+    outputStream = socket.getOutputStream();
+    inputStream = socket.getInputStream();
+
+    protobufProtocolSerializer = new ProtobufProtocolSerializer();
+  }
+
+  @After
+  public void tearDown() {
+    if (cache != null) {
+      cache.close();
+    }
+  }
+
+  @Test
+  public void testNormalHandshakeSucceeds() throws Exception {
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+    ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+        .build().writeDelimitedTo(outputStream);
+    ClientProtocol.Message handshakeResponse = 
protobufProtocolSerializer.deserialize(inputStream);
+    
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+  }
+
+  @Test
+  public void testInvalidMajorVersionBreaksConnection() throws Exception {
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE);
+
+    // Verify that connection is closed
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      try {
+        assertEquals(-1, socket.getInputStream().read()); // EOF implies 
disconnected.
+        return true;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
index 4257911e0a..d61ac868f4 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
@@ -25,6 +25,7 @@
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.SSLConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -57,6 +58,7 @@
 import java.net.Socket;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -121,7 +123,6 @@ public void setup() throws Exception {
     cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, 
"false");
     cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, 
"false");
     cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
-    cacheFactory.setSecurityManager(null);
     cache = cacheFactory.create();
 
     CacheServer cacheServer = cache.addCacheServer();
@@ -141,7 +142,8 @@ public void setup() throws Exception {
     }
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
   }
@@ -196,8 +198,11 @@ public void testBasicMessagesAndStats() throws Exception {
 
   @Test
   public void testConnectionCountIsProperlyDecremented() throws Exception {
-    CacheServer cacheServer = 
this.cache.getCacheServers().stream().findFirst().get();
+    List<CacheServer> cacheServers = this.cache.getCacheServers();
+    assertEquals(1, cacheServers.size());
+    CacheServer cacheServer = cacheServers.stream().findFirst().get();
     AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+
     Awaitility.await().atMost(30, TimeUnit.SECONDS)
         .until(() -> acceptor.getClientServerCnxCount() == 1);
 
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
index 61a09f25d1..05f56eefb4 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
@@ -26,6 +26,8 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
@@ -100,7 +102,8 @@ public void setup() throws Exception {
 
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
 
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
index a256e84f83..fb5ed5eb39 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
@@ -102,7 +102,8 @@ public void setup() throws Exception {
     socket = new Socket("localhost", cacheServerPort);
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -188,6 +189,7 @@ private void validateSocketCreationAndDestruction(int 
cacheServerPort, int conne
           Awaitility.await().atMost(5, 
TimeUnit.SECONDS).until(socket::isConnected);
           OutputStream outputStream = socket.getOutputStream();
           
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+          
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
           ClientProtocol.Message.newBuilder()
               .setRequest(ClientProtocol.Request.newBuilder()
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
index c81f0d6095..248794485c 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
@@ -138,6 +138,7 @@ public void setup() throws Exception {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     ClientProtocol.Message.newBuilder()
         .setRequest(ClientProtocol.Request.newBuilder()
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
index 3d239b586f..2558b274fb 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -24,6 +24,7 @@
 import java.net.Socket;
 import java.util.Properties;
 
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -75,6 +76,7 @@ private Socket createSocket() throws IOException {
     dataOutputStream.writeInt(0);
     // Using the constant from AcceptorImpl to ensure that magic byte is the 
same
     dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
+    
dataOutputStream.writeByte(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
     return socket;
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add Handshake/Message version byte
> ----------------------------------
>
>                 Key: GEODE-3895
>                 URL: https://issues.apache.org/jira/browse/GEODE-3895
>             Project: Geode
>          Issue Type: Improvement
>          Components: client/server
>            Reporter: Brian Baynes
>            Priority: Major
>
> Add an extra protocol version byte so that the first two bytes will 
> (initially) be 110 followed by 1. The '1' byte will be increased when the 
> HandshakeRequest or Message changes in such a way that we can't make the 
> handshake backward-compatible.
> Ensure that clients on newer versions will have their connections terminated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to