[ 
https://issues.apache.org/jira/browse/GEODE-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16227691#comment-16227691
 ] 

ASF GitHub Bot commented on GEODE-3894:
---------------------------------------

kohlmu-pivotal closed pull request #985: GEODE-3894: Create HandshakeRequest
URL: https://github.com/apache/geode/pull/985
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
index f95704b52d..d61a2d558b 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
@@ -21,47 +21,34 @@
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import 
org.apache.geode.internal.protocol.security.processors.AuthorizationSecurityProcessor;
-import 
org.apache.geode.internal.protocol.security.processors.NoAuthenticationSecurityProcessor;
-import org.apache.geode.internal.protocol.security.Authenticator;
-import org.apache.geode.internal.protocol.security.NoOpAuthenticator;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
-import org.apache.geode.internal.protocol.security.Authorizer;
-import org.apache.geode.internal.protocol.security.NoOpAuthorizer;
 
 @Experimental
 public class MessageExecutionContext {
   private final Cache cache;
   private final Locator locator;
-  private final Authorizer authorizer;
-  private Object authenticatedToken;
   private final ProtocolClientStatistics statistics;
-  private SecurityProcessor securityProcessor;
-  private final Authenticator authenticator;
+  private ConnectionStateProcessor connectionStateProcessor;
 
-
-  public MessageExecutionContext(Cache cache, Authenticator authenticator,
-      Authorizer streamAuthorizer, Object authenticatedToken, 
ProtocolClientStatistics statistics,
-      SecurityProcessor securityProcessor) {
+  public MessageExecutionContext(Cache cache, ProtocolClientStatistics 
statistics,
+      ConnectionStateProcessor initialConnectionStateProcessor) {
     this.cache = cache;
     this.locator = null;
-    this.authorizer = streamAuthorizer;
-    this.authenticatedToken = authenticatedToken;
     this.statistics = statistics;
-    this.securityProcessor = securityProcessor;
-    this.authenticator = authenticator;
+    this.connectionStateProcessor = initialConnectionStateProcessor;
   }
 
-  public MessageExecutionContext(InternalLocator locator, 
ProtocolClientStatistics statistics) {
+  public MessageExecutionContext(InternalLocator locator, 
ProtocolClientStatistics statistics,
+      ConnectionStateProcessor initialConnectionStateProcessor) {
     this.locator = locator;
     this.cache = null;
-    // set a no-op authorizer until such time as locators implement 
authentication
-    // and authorization checks
-    this.authorizer = new NoOpAuthorizer();
-    this.authenticator = new NoOpAuthenticator();
     this.statistics = statistics;
-    this.securityProcessor = new NoAuthenticationSecurityProcessor();
+    connectionStateProcessor = initialConnectionStateProcessor;
+  }
+
+  public ConnectionStateProcessor getConnectionStateProcessor() {
+    return connectionStateProcessor;
   }
 
   /**
@@ -93,21 +80,6 @@ public Locator getLocator() throws 
InvalidExecutionContextException {
   }
 
   /**
-   * Return the authorizer associated with this execution
-   */
-  public Authorizer getAuthorizer() {
-    return authorizer;
-  }
-
-  /**
-   * Returns the authentication token associated with this execution
-   */
-  public Object getAuthenticationToken() {
-    return authenticatedToken;
-  }
-
-
-  /**
    * Returns the statistics for recording operation stats. In a unit test 
environment this may not
    * be a protocol-specific statistics implementation.
    */
@@ -115,19 +87,7 @@ public ProtocolClientStatistics getStatistics() {
     return statistics;
   }
 
-  public Authenticator getAuthenticator() {
-    return authenticator;
-  }
-
-  public SecurityProcessor getSecurityProcessor() {
-    return securityProcessor;
-  }
-
-  public void setSecurityProcessor(AuthorizationSecurityProcessor 
securityProcessor) {
-    this.securityProcessor = securityProcessor;
-  }
-
-  public void setAuthenticationToken(Object authenticationToken) {
-    this.authenticatedToken = authenticationToken;
+  public void setConnectionStateProcessor(ConnectionStateProcessor 
connectionStateProcessor) {
+    this.connectionStateProcessor = connectionStateProcessor;
   }
 }
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ProtocolErrorCode.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ProtocolErrorCode.java
index 302889cb03..7b289fcee7 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ProtocolErrorCode.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ProtocolErrorCode.java
@@ -20,9 +20,11 @@
   UNSUPPORTED_VERSION(1101),
   UNSUPPORTED_OPERATION(1102),
   UNSUPPORTED_AUTHENTICATION_MODE(1103),
+  HANDSHAKE_REQUIRED(1104),
   AUTHENTICATION_FAILED(1200),
   AUTHORIZATION_FAILED(1201),
   ALREADY_AUTHENTICATED(1202),
+  AUTHENTICATION_NOT_SUPPORTED(1203),
   LOW_MEMORY(1300),
   DATA_UNREACHABLE(1301),
   OPERATION_TIMEOUT(1302),
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authenticator.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authenticator.java
deleted file mode 100644
index 42dd22bd00..0000000000
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authenticator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.security;
-
-import org.apache.geode.security.AuthenticationFailedException;
-
-/**
- * Implementers of this interface do some message passing over a socket to 
authenticate a client,
- * then hand off the connection to the protocol that will talk on the socket.
- *
- * If authentication fails, an implementor may continue to wait for another 
valid authentication
- * exchange.
- */
-public interface Authenticator<SecurityProperties, AuthenticationToken> {
-  /**
-   * @param securityProperties a generic object that is required for 
authentication dependent on
-   *        implementation
-   * @return authenticated principal
-   */
-  AuthenticationToken authenticate(SecurityProperties securityProperties)
-      throws AuthenticationFailedException;
-}
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authorizer.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authorizer.java
deleted file mode 100644
index 525e42d366..0000000000
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/Authorizer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.security;
-
-import org.apache.geode.security.ResourcePermission;
-
-public interface Authorizer {
-  boolean authorize(Object authenticatedToken, ResourcePermission 
permissionRequested);
-}
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/SecurityProcessor.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/SecurityProcessor.java
deleted file mode 100644
index a60cd73b3b..0000000000
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/SecurityProcessor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.security;
-
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.OperationContext;
-
-public interface SecurityProcessor<RequestMessage> {
-  /**
-   * This method will validate an Operation. In the case of a failure, it will 
throw an exception.
-   */
-  void validateOperation(RequestMessage request, MessageExecutionContext 
messageExecutionContext,
-      OperationContext operationContext);
-}
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/NoAuthenticationSecurityProcessor.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionAuthenticatingStateProcessor.java
similarity index 57%
rename from 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/NoAuthenticationSecurityProcessor.java
rename to 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionAuthenticatingStateProcessor.java
index 4ed23f5087..a57c90e700 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/NoAuthenticationSecurityProcessor.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionAuthenticatingStateProcessor.java
@@ -12,17 +12,20 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.security.processors;
+package org.apache.geode.internal.protocol.state;
 
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.OperationContext;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 
-public class NoAuthenticationSecurityProcessor implements 
SecurityProcessor<Object> {
+import java.util.Properties;
 
+public interface ConnectionAuthenticatingStateProcessor extends 
ConnectionStateProcessor {
   @Override
-  public void validateOperation(Object request, MessageExecutionContext 
messageExecutionContext,
-      OperationContext operationContext) {
-    // A truly "no-op" operation :)
+  default ConnectionAuthenticatingStateProcessor allowAuthentication()
+      throws ConnectionStateException {
+    return this;
   }
+
+  // Handle authentication properties, this function should return a new 
ConnectionStateProcessor
+  // based on the result of this operation.
+  ConnectionStateProcessor authenticate(Properties properties);
 }
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthorizer.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
similarity index 61%
rename from 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthorizer.java
rename to 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
index 61d038342e..f885bfc16b 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthorizer.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
@@ -12,16 +12,17 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.security;
+package org.apache.geode.internal.protocol.state;
 
-import org.apache.geode.security.ResourcePermission;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 
-/**
- * An implementation of {@link Authorizer} that doesn't use its parameters and 
always returns true.
- */
-public class NoOpAuthorizer implements Authorizer {
+public interface ConnectionHandshakingStateProcessor extends 
ConnectionStateProcessor {
   @Override
-  public boolean authorize(Object authenticatedToken, ResourcePermission 
permissionRequested) {
-    return true;
+  default ConnectionHandshakingStateProcessor allowHandshake() throws 
ConnectionStateException {
+    return this;
   }
+
+  // This is called when a handshake operation succeeds to get the processor 
for the next connection
+  // state.
+  ConnectionStateProcessor handshakeSucceeded();
 }
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionShiroAuthorizingStateProcessor.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionShiroAuthorizingStateProcessor.java
new file mode 100644
index 0000000000..0dd2bbc2a2
--- /dev/null
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionShiroAuthorizingStateProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.state;
+
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.OperationContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.shiro.subject.Subject;
+import org.apache.shiro.util.ThreadState;
+
+public class ConnectionShiroAuthorizingStateProcessor implements 
ConnectionStateProcessor {
+  private final SecurityService securityService;
+  private final Subject subject;
+
+  public ConnectionShiroAuthorizingStateProcessor(SecurityService 
securityService,
+      Subject subject) {
+    this.securityService = securityService;
+    this.subject = subject;
+  }
+
+  @Override
+  public void validateOperation(MessageExecutionContext messageContext,
+      OperationContext operationContext) throws ConnectionStateException {
+    ThreadState threadState = securityService.bindSubject(subject);
+    try {
+      
securityService.authorize(operationContext.getAccessPermissionRequired());
+    } catch (NotAuthorizedException e) {
+      messageContext.getStatistics().incAuthorizationViolations();
+      throw new 
ConnectionStateException(ProtocolErrorCode.AUTHORIZATION_FAILED,
+          "The user is not authorized to complete this operation");
+    } finally {
+      threadState.restore();
+    }
+  }
+
+  @Override
+  public ConnectionAuthenticatingStateProcessor allowAuthentication()
+      throws ConnectionStateException {
+    throw new ConnectionStateException(ProtocolErrorCode.ALREADY_AUTHENTICATED,
+        "The user has already been authenticated for this connection. 
Re-authentication is not supported at this time.");
+  }
+}
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
new file mode 100644
index 0000000000..24d6d01600
--- /dev/null
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.state;
+
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.OperationContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+
+/**
+ * This object encapsulates any operation processing that is specific to the 
current state of a
+ * protocol connection. It is used to inject behavior at specific points in 
the processing of
+ * protocol operations as opposed to requiring an explicit state machine to 
drive the state
+ * transitions from outside the protocol.
+ */
+public interface ConnectionStateProcessor {
+  /**
+   * @throws ConnectionStateException if incapable of handling the given 
operationContext when the
+   *         connection is in the state contained in the provided 
messageContext. Otherwise, does
+   *         nothing.
+   */
+  void validateOperation(MessageExecutionContext messageContext, 
OperationContext operationContext)
+      throws ConnectionStateException;
+
+  /**
+   * This indicates whether this specific state processor is able to handle 
authentication requests.
+   * 
+   * @return specialized ConnectionAuthenticatingStateProcessor interface 
implementation which can
+   *         move to a new state
+   * @throws ConnectionStateException if unable to handle handshakes in this 
state.
+   */
+  default ConnectionAuthenticatingStateProcessor allowAuthentication()
+      throws ConnectionStateException {
+    throw new ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_OPERATION,
+        "Requested operation not allowed at this time");
+  }
+
+  /**
+   * This indicates whether this specific state processor is able to handle 
handshake requests.
+   * 
+   * @return specialized ConnectionHandshakingStateProcessor interface 
implementation which can move
+   *         to a new state
+   * @throws ConnectionStateException if unable to handle handshakes in this 
state.
+   */
+  default ConnectionHandshakingStateProcessor allowHandshake() throws 
ConnectionStateException {
+    throw new ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_OPERATION,
+        "Requested operation not allowed at this time");
+  }
+}
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/InvalidConfigAuthenticator.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/LegacySecurityConnectionStateProcessor.java
similarity index 56%
rename from 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/InvalidConfigAuthenticator.java
rename to 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/LegacySecurityConnectionStateProcessor.java
index 5750e1315c..736c9c8425 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/InvalidConfigAuthenticator.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/LegacySecurityConnectionStateProcessor.java
@@ -12,23 +12,25 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
+package org.apache.geode.internal.protocol.state;
 
-package org.apache.geode.internal.protocol.security;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.internal.logging.LogService;
-import 
org.apache.geode.internal.protocol.security.exception.IncompatibleAuthenticationMechanismsException;
-import org.apache.geode.security.AuthenticationFailedException;
-
-public class InvalidConfigAuthenticator implements Authenticator<Object, 
Object> {
-  private static final Logger logger = 
LogService.getLogger(InvalidConfigAuthenticator.class);
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.OperationContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 
+public class LegacySecurityConnectionStateProcessor implements 
ConnectionStateProcessor {
   @Override
-  public Object authenticate(Object object) throws 
AuthenticationFailedException {
-    logger.warn(
+  public void validateOperation(MessageExecutionContext messageContext,
+      OperationContext operationContext) throws ConnectionStateException {
+    throw new 
ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_AUTHENTICATION_MODE,
         "Attempting to authenticate incoming protobuf message using legacy 
security implementation. This is not supported. Failing authentication.");
-    throw new IncompatibleAuthenticationMechanismsException(
+  }
+
+  @Override
+  public ConnectionAuthenticatingStateProcessor allowAuthentication()
+      throws ConnectionStateException {
+    throw new 
ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_AUTHENTICATION_MODE,
         "Attempting to authenticate incoming protobuf message using legacy 
security implementation. This is not supported. Failing authentication.");
   }
 }
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/AuthorizationSecurityProcessor.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/NoSecurityConnectionStateProcessor.java
similarity index 57%
rename from 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/AuthorizationSecurityProcessor.java
rename to 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/NoSecurityConnectionStateProcessor.java
index 59d0084a57..2c697b82b2 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/processors/AuthorizationSecurityProcessor.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/NoSecurityConnectionStateProcessor.java
@@ -12,18 +12,21 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.security.processors;
+package org.apache.geode.internal.protocol.state;
 
 import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.OperationContext;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 
-public class AuthorizationSecurityProcessor implements 
SecurityProcessor<Object> {
+public class NoSecurityConnectionStateProcessor implements 
ConnectionStateProcessor {
   @Override
-  public void validateOperation(Object request, MessageExecutionContext 
messageExecutionContext,
-      OperationContext operationContext) {
-    messageExecutionContext.getAuthorizer().authorize(
-        messageExecutionContext.getAuthenticationToken(),
-        operationContext.getAccessPermissionRequired());
+  public void validateOperation(MessageExecutionContext messageContext,
+      OperationContext operationContext) throws ConnectionStateException {}
+
+  public ConnectionAuthenticatingStateProcessor allowAuthentication()
+      throws ConnectionStateException {
+    throw new 
ConnectionStateException(ProtocolErrorCode.AUTHENTICATION_NOT_SUPPORTED,
+        "This machine is not set to require user authentication.");
   }
 }
diff --git 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthenticator.java
 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/exception/ConnectionStateException.java
similarity index 64%
rename from 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthenticator.java
rename to 
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/exception/ConnectionStateException.java
index 55f5addd4c..a0e29224f7 100644
--- 
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/security/NoOpAuthenticator.java
+++ 
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/exception/ConnectionStateException.java
@@ -12,18 +12,19 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.security;
+package org.apache.geode.internal.protocol.state.exception;
 
-import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
 
-/**
- * An implementation of {@link Authenticator} that doesn't use its parameters 
and always returns
- * true.
- */
-public class NoOpAuthenticator implements Authenticator<Object, Object> {
+public class ConnectionStateException extends Exception {
+  private final ProtocolErrorCode errorCode;
+
+  public ConnectionStateException(ProtocolErrorCode errorCode, String 
errorMessage) {
+    super(errorMessage);
+    this.errorCode = errorCode;
+  }
 
-  @Override
-  public Object authenticate(Object inputObject) throws 
AuthenticationFailedException {
-    return null;
+  public ProtocolErrorCode getErrorCode() {
+    return errorCode;
   }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufCachePipeline.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufCachePipeline.java
index 44829e0561..b7fc149725 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufCachePipeline.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufCachePipeline.java
@@ -22,28 +22,29 @@
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
+import 
org.apache.geode.internal.protocol.protobuf.state.ProtobufConnectionHandshakeStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.security.Authenticator;
-import org.apache.geode.internal.protocol.security.Authorizer;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
+import org.apache.geode.internal.security.SecurityService;
 
 
 @Experimental
 public final class ProtobufCachePipeline implements ClientProtocolProcessor {
   private final ProtocolClientStatistics statistics;
   private final ProtobufStreamProcessor streamProcessor;
-
+  private final ConnectionStateProcessor initialCacheConnectionStateProcessor;
   private final MessageExecutionContext messageExecutionContext;
 
   ProtobufCachePipeline(ProtobufStreamProcessor protobufStreamProcessor,
-      ProtocolClientStatistics statistics, Cache cache, Authenticator 
authenticator,
-      Authorizer authorizer, SecurityProcessor securityProcessor) {
+      ProtocolClientStatistics statistics, Cache cache, SecurityService 
securityService) {
     this.streamProcessor = protobufStreamProcessor;
     this.statistics = statistics;
     this.statistics.clientConnected();
-    this.messageExecutionContext = new MessageExecutionContext(cache, 
authenticator, authorizer,
-        null, statistics, securityProcessor);
+    this.initialCacheConnectionStateProcessor =
+        new ProtobufConnectionHandshakeStateProcessor(securityService);
+    this.messageExecutionContext =
+        new MessageExecutionContext(cache, statistics, 
initialCacheConnectionStateProcessor);
   }
 
   @Override
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufLocatorPipeline.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufLocatorPipeline.java
index 12aab64312..b73da81e5f 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufLocatorPipeline.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufLocatorPipeline.java
@@ -23,7 +23,9 @@
 import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.MessageExecutionContext;
+import 
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 
 @Experimental
@@ -31,6 +33,7 @@
   private final ProtocolClientStatistics statistics;
   private final InternalLocator locator;
   private final ProtobufStreamProcessor streamProcessor;
+  private final ConnectionStateProcessor locatorConnectionState;
 
   ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
       ProtocolClientStatistics statistics, InternalLocator locator) {
@@ -38,13 +41,14 @@
     this.statistics = statistics;
     this.locator = locator;
     this.statistics.clientConnected();
+    this.locatorConnectionState = new NoSecurityConnectionStateProcessor();
   }
 
   @Override
   public void processMessage(InputStream inputStream, OutputStream 
outputStream)
       throws IOException, IncompatibleVersionException {
     streamProcessor.receiveMessage(inputStream, outputStream,
-        new MessageExecutionContext(locator, statistics));
+        new MessageExecutionContext(locator, statistics, 
locatorConnectionState));
   }
 
   @Override
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
index 79784ebf63..5b2c18c516 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.protocol.protobuf;
 
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
@@ -25,10 +26,7 @@
 import org.apache.geode.internal.protocol.Result;
 import 
org.apache.geode.internal.protocol.protobuf.registry.ProtobufOperationContextRegistry;
 import 
org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
 import org.apache.geode.internal.protocol.serialization.SerializationService;
-import org.apache.geode.security.AuthenticationRequiredException;
-import org.apache.geode.security.NotAuthorizedException;
 
 import static org.apache.geode.internal.protocol.ProtocolErrorCode.*;
 
@@ -57,19 +55,13 @@ public ProtobufOpsProcessor(SerializationService 
serializationService,
         protobufOperationContextRegistry.getOperationContext(requestType);
     Result result;
 
-    SecurityProcessor securityProcessor = 
messageExecutionContext.getSecurityProcessor();
     try {
-      securityProcessor.validateOperation(request, messageExecutionContext, 
operationContext);
+      messageExecutionContext.getConnectionStateProcessor()
+          .validateOperation(messageExecutionContext, operationContext);
       result = processOperation(request, messageExecutionContext, requestType, 
operationContext);
-    } catch (AuthenticationRequiredException e) {
-      logger.warn(e);
-      result = Failure
-          
.of(ProtobufResponseUtilities.makeErrorResponse(AUTHENTICATION_FAILED, 
e.getMessage()));
-    } catch (NotAuthorizedException e) {
-      logger.warn(e);
-      messageExecutionContext.getStatistics().incAuthorizationViolations();
-      result = 
Failure.of(ProtobufResponseUtilities.makeErrorResponse(AUTHORIZATION_FAILED,
-          "The user is not authorized to complete this operation"));
+    } catch (ConnectionStateException e) {
+      logger.warn(e.getMessage());
+      result = Failure.of(ProtobufResponseUtilities.makeErrorResponse(e));
     }
 
     return ((ClientProtocol.Response.Builder) 
result.map(operationContext.getToResponse(),
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
index 6c63042c04..c87398fa80 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
@@ -20,9 +20,6 @@
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
 import 
org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
-import org.apache.geode.internal.protocol.security.Authenticator;
-import org.apache.geode.internal.protocol.security.Authorizer;
-import 
org.apache.geode.internal.protocol.protobuf.security.ProtobufSecurityLookupService;
 import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 import org.apache.geode.internal.security.SecurityService;
@@ -30,8 +27,6 @@
 public class ProtobufProtocolService implements ClientProtocolService {
   private volatile ProtocolClientStatistics statistics;
   private final ProtobufStreamProcessor protobufStreamProcessor = new 
ProtobufStreamProcessor();
-  private final ProtobufSecurityLookupService protobufSecurityLookupService =
-      new ProtobufSecurityLookupService();
 
   @Override
   public synchronized void initializeStatistics(String statisticsName, 
StatisticsFactory factory) {
@@ -45,12 +40,8 @@ public ClientProtocolProcessor createProcessorForCache(Cache 
cache,
       SecurityService securityService) {
     assert (statistics != null);
 
-    Authenticator authenticator =
-        protobufSecurityLookupService.lookupAuthenticator(securityService);
-    Authorizer authorizer = 
protobufSecurityLookupService.lookupAuthorizer(securityService);
-
-    return new ProtobufCachePipeline(protobufStreamProcessor, getStatistics(), 
cache, authenticator,
-        authorizer, 
protobufSecurityLookupService.lookupProcessor(securityService));
+    return new ProtobufCachePipeline(protobufStreamProcessor, getStatistics(), 
cache,
+        securityService);
   }
 
   /**
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandler.java
new file mode 100644
index 0000000000..e0b52283b3
--- /dev/null
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.operations;
+
+import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import 
org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
+import org.apache.geode.internal.protocol.Failure;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.Result;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+import org.apache.geode.internal.protocol.Success;
+import org.apache.geode.internal.protocol.operations.OperationHandler;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
+import org.apache.geode.internal.protocol.serialization.SerializationService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HandshakeRequestOperationHandler implements
+    OperationHandler<ConnectionAPI.HandshakeRequest, 
ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> {
+  private static final Logger logger = LogManager.getLogger();
+
+  @Override
+  public Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
process(
+      SerializationService serializationService, 
ConnectionAPI.HandshakeRequest request,
+      MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
+    ConnectionHandshakingStateProcessor stateProcessor;
+
+    try {
+      stateProcessor = 
messageExecutionContext.getConnectionStateProcessor().allowHandshake();
+    } catch (ConnectionStateException e) {
+      return Failure.of(ProtobufResponseUtilities.makeErrorResponse(e));
+    }
+
+    boolean handshakeSucceeded = false;
+    // Require an exact match with our version of the protobuf code for this 
implementation
+    if (request.getMajorVersion() == 
ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE
+        && request.getMinorVersion() == 
ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE) {
+      handshakeSucceeded = true;
+      ConnectionStateProcessor nextStateProcessor = 
stateProcessor.handshakeSucceeded();
+      messageExecutionContext.setConnectionStateProcessor(nextStateProcessor);
+    }
+
+    return Success.of(ConnectionAPI.HandshakeResponse.newBuilder()
+        
.setServerMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+        
.setServerMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)
+        .setHandshakePassed(handshakeSucceeded).build());
+  }
+}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/security/AuthenticationRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/security/AuthenticationRequestOperationHandler.java
index b15e8560a7..6d31b94713 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/security/AuthenticationRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/security/AuthenticationRequestOperationHandler.java
@@ -16,6 +16,11 @@
 
 import java.util.Properties;
 
+import 
org.apache.geode.internal.protocol.state.ConnectionAuthenticatingStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -26,49 +31,42 @@
 import org.apache.geode.internal.protocol.Result;
 import org.apache.geode.internal.protocol.Success;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
-import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import 
org.apache.geode.internal.protocol.security.exception.IncompatibleAuthenticationMechanismsException;
-import 
org.apache.geode.internal.protocol.security.processors.AuthorizationSecurityProcessor;
-import org.apache.geode.internal.protocol.security.Authenticator;
 import org.apache.geode.internal.protocol.serialization.SerializationService;
 import org.apache.geode.security.AuthenticationFailedException;
 
 public class AuthenticationRequestOperationHandler implements
-    OperationHandler<AuthenticationAPI.AuthenticationRequest, 
AuthenticationAPI.AuthenticationResponse, ClientProtocol.ErrorResponse> {
+    OperationHandler<ConnectionAPI.AuthenticationRequest, 
ConnectionAPI.AuthenticationResponse, ClientProtocol.ErrorResponse> {
   private static final Logger logger = LogManager.getLogger();
 
   @Override
-  public Result<AuthenticationAPI.AuthenticationResponse, 
ClientProtocol.ErrorResponse> process(
-      SerializationService serializationService, 
AuthenticationAPI.AuthenticationRequest request,
+  public Result<ConnectionAPI.AuthenticationResponse, 
ClientProtocol.ErrorResponse> process(
+      SerializationService serializationService, 
ConnectionAPI.AuthenticationRequest request,
       MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
+    ConnectionAuthenticatingStateProcessor stateProcessor;
 
-    if (messageExecutionContext.getAuthenticationToken() != null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          .setError(buildAndLogError(ProtocolErrorCode.ALREADY_AUTHENTICATED,
-              "The user has already been authenticated for this connection. 
Re-authentication is not supported at this time.",
-              null))
-          .build());
+    try {
+      stateProcessor = 
messageExecutionContext.getConnectionStateProcessor().allowAuthentication();
+    } catch (ConnectionStateException e) {
+      return Failure.of(ProtobufResponseUtilities.makeErrorResponse(e));
     }
 
-    Authenticator authenticator = messageExecutionContext.getAuthenticator();
     Properties properties = new Properties();
     properties.putAll(request.getCredentialsMap());
 
     try {
-      Object authenticationToken = authenticator.authenticate(properties);
-      messageExecutionContext.setSecurityProcessor(new 
AuthorizationSecurityProcessor());
-      messageExecutionContext.setAuthenticationToken(authenticationToken);
+      
messageExecutionContext.setConnectionStateProcessor(stateProcessor.authenticate(properties));
       return Success
-          
.of(AuthenticationAPI.AuthenticationResponse.newBuilder().setAuthenticated(true).build());
+          
.of(ConnectionAPI.AuthenticationResponse.newBuilder().setAuthenticated(true).build());
     } catch (IncompatibleAuthenticationMechanismsException e) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder().setError(
           buildAndLogError(ProtocolErrorCode.UNSUPPORTED_AUTHENTICATION_MODE, 
e.getMessage(), e))
           .build());
     } catch (AuthenticationFailedException e) {
-      return Success.of(
-          
AuthenticationAPI.AuthenticationResponse.newBuilder().setAuthenticated(false).build());
+      return Success
+          
.of(ConnectionAPI.AuthenticationResponse.newBuilder().setAuthenticated(false).build());
     }
   }
 
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/registry/ProtobufOperationContextRegistry.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/registry/ProtobufOperationContextRegistry.java
index 6fd228dc94..d0dc8bdb9c 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/registry/ProtobufOperationContextRegistry.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/registry/ProtobufOperationContextRegistry.java
@@ -21,12 +21,14 @@
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import 
org.apache.geode.internal.protocol.protobuf.ClientProtocol.Request.RequestAPICase;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.ProtobufOperationContext;
 import 
org.apache.geode.internal.protocol.protobuf.operations.GetAllRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.GetAvailableServersOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.GetRegionNamesRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.GetRegionRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.GetRequestOperationHandler;
+import 
org.apache.geode.internal.protocol.protobuf.operations.HandshakeRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.PutAllRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.PutRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.operations.RemoveRequestOperationHandler;
@@ -109,5 +111,12 @@ private void addContexts() {
             opsResp -> 
ClientProtocol.Response.newBuilder().setGetAvailableServersResponse(opsResp),
             new ResourcePermission(ResourcePermission.Resource.CLUSTER,
                 ResourcePermission.Operation.READ)));
+
+    operationContexts.put(RequestAPICase.HANDSHAKEREQUEST,
+        new 
ProtobufOperationContext<>(ClientProtocol.Request::getHandshakeRequest,
+            new HandshakeRequestOperationHandler(),
+            opsResp -> 
ClientProtocol.Response.newBuilder().setHandshakeResponse(opsResp),
+            new ResourcePermission(ResourcePermission.Resource.DATA,
+                ResourcePermission.Operation.READ)));
   }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufSecurityLookupService.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufSecurityLookupService.java
deleted file mode 100644
index f19c763b04..0000000000
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufSecurityLookupService.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.security;
-
-import 
org.apache.geode.internal.protocol.protobuf.security.processors.AuthenticationSecurityProcessor;
-import org.apache.geode.internal.protocol.security.Authenticator;
-import org.apache.geode.internal.protocol.security.Authorizer;
-import org.apache.geode.internal.protocol.security.InvalidConfigAuthenticator;
-import org.apache.geode.internal.protocol.security.NoOpAuthenticator;
-import org.apache.geode.internal.protocol.security.NoOpAuthorizer;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
-import 
org.apache.geode.internal.protocol.security.processors.NoAuthenticationSecurityProcessor;
-import org.apache.geode.internal.security.SecurityService;
-
-public class ProtobufSecurityLookupService {
-  private final Authenticator[] authenticators = new Authenticator[3];
-  private final Authorizer[] authorizers = new Authorizer[2];
-  private final SecurityProcessor[] securityProcessors = new 
SecurityProcessor[2];
-
-  public ProtobufSecurityLookupService() {
-    initializeAuthenticators();
-    initializeAuthortizers();
-    initializeSecurityProcessors();
-  }
-
-  private void initializeSecurityProcessors() {
-    securityProcessors[0] = new NoAuthenticationSecurityProcessor();
-    securityProcessors[1] = new AuthenticationSecurityProcessor();
-  }
-
-  private void initializeAuthenticators() {
-    authenticators[0] = new NoOpAuthenticator();
-    authenticators[1] = new InvalidConfigAuthenticator();
-  }
-
-  private void initializeAuthortizers() {
-    authorizers[0] = new NoOpAuthorizer();
-  }
-
-  public SecurityProcessor lookupProcessor(SecurityService securityService) {
-    return securityProcessors[isSecurityEnabled(securityService) ? 1 : 0];
-  }
-
-  public Authenticator lookupAuthenticator(SecurityService securityService) {
-    if (securityService.isIntegratedSecurity()) {
-      // no need to care about thread safety, eventually there will only be 
one authenticator
-      if (authenticators[2] == null) {
-        authenticators[2] = new ProtobufShiroAuthenticator(securityService);
-      }
-      // Simple authenticator...normal shiro
-      return authenticators[2];
-    }
-    if (isLegacySecurity(securityService)) {
-      // Failing authentication...legacy security
-      return authenticators[1];
-    } else {
-      // Noop authenticator...no security
-      return authenticators[0];
-    }
-  }
-
-  public Authorizer lookupAuthorizer(SecurityService securityService) {
-    if (securityService.isIntegratedSecurity()) {
-      // Simple authenticator...normal shiro
-      if (authorizers[1] == null) {
-        authorizers[1] = new ProtobufShiroAuthorizer(securityService);
-      }
-      // Simple authenticator...normal shiro
-      return authorizers[1];
-    }
-    if (isLegacySecurity(securityService)) {
-      // Failing authentication...legacy security
-      // This should never be called.
-      return null;
-    } else {
-      // Noop authenticator...no security
-      return authorizers[0];
-    }
-  }
-
-  private boolean isLegacySecurity(SecurityService securityService) {
-    return securityService.isPeerSecurityRequired() || 
securityService.isClientSecurityRequired();
-  }
-
-  private boolean isSecurityEnabled(SecurityService securityService) {
-    return securityService.isIntegratedSecurity() || 
isLegacySecurity(securityService);
-  }
-}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthenticator.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthenticator.java
deleted file mode 100644
index fc95751433..0000000000
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthenticator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.security;
-
-import java.util.Properties;
-
-import org.apache.shiro.subject.Subject;
-
-import org.apache.geode.internal.protocol.security.Authenticator;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.AuthenticationFailedException;
-
-public class ProtobufShiroAuthenticator implements Authenticator<Properties, 
Subject> {
-  private final SecurityService securityService;
-
-  public ProtobufShiroAuthenticator(SecurityService securityService) {
-    this.securityService = securityService;
-  }
-
-  @Override
-  public Subject authenticate(Properties properties) throws 
AuthenticationFailedException {
-    return securityService.login(properties);
-  }
-}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthorizer.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthorizer.java
deleted file mode 100644
index 96610e49c2..0000000000
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/ProtobufShiroAuthorizer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.security;
-
-import org.apache.shiro.subject.Subject;
-import org.apache.shiro.util.ThreadState;
-
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.ResourcePermission;
-import org.apache.geode.internal.protocol.security.Authorizer;
-
-public class ProtobufShiroAuthorizer implements Authorizer {
-  private final SecurityService securityService;
-
-  public ProtobufShiroAuthorizer(SecurityService securityService) {
-    this.securityService = securityService;
-  }
-
-  @Override
-  public boolean authorize(Object authenticatedToken, ResourcePermission 
permissionRequested) {
-    ThreadState threadState = securityService.bindSubject((Subject) 
authenticatedToken);
-
-    try {
-      securityService.authorize(permissionRequested);
-      return true;
-    } finally {
-      threadState.restore();
-    }
-  }
-}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/processors/AuthenticationSecurityProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/processors/AuthenticationSecurityProcessor.java
deleted file mode 100644
index 0f48c2282f..0000000000
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/processors/AuthenticationSecurityProcessor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.security.processors;
-
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.OperationContext;
-import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
-import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.internal.protocol.security.SecurityProcessor;
-import org.apache.geode.security.AuthenticationRequiredException;
-
-public class AuthenticationSecurityProcessor implements 
SecurityProcessor<ClientProtocol.Request> {
-  @Override
-  public void validateOperation(ClientProtocol.Request request,
-      MessageExecutionContext messageExecutionContext, OperationContext 
operationContext) {
-    Object fromRequest = operationContext.getFromRequest().apply(request);
-    if (!(fromRequest instanceof AuthenticationAPI.AuthenticationRequest)) {
-      throw new AuthenticationRequiredException(
-          "Expecting an authentication message. Received a " + 
fromRequest.getClass() + " message");
-    }
-  }
-}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ConnectionShiroAuthenticatingStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ConnectionShiroAuthenticatingStateProcessor.java
new file mode 100644
index 0000000000..88f13fc75d
--- /dev/null
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ConnectionShiroAuthenticatingStateProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.state;
+
+import 
org.apache.geode.internal.protocol.state.ConnectionAuthenticatingStateProcessor;
+import 
org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.OperationContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+import 
org.apache.geode.internal.protocol.protobuf.operations.security.AuthenticationRequestOperationHandler;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.shiro.subject.Subject;
+
+import java.util.Properties;
+
+public class ConnectionShiroAuthenticatingStateProcessor
+    implements ConnectionAuthenticatingStateProcessor {
+  private final SecurityService securityService;
+
+  public ConnectionShiroAuthenticatingStateProcessor(SecurityService 
securityService) {
+    this.securityService = securityService;
+  }
+
+  @Override
+  public void validateOperation(MessageExecutionContext messageContext,
+      OperationContext operationContext) throws ConnectionStateException {
+    if (!(operationContext
+        .getOperationHandler() instanceof 
AuthenticationRequestOperationHandler)) {
+      throw new 
ConnectionStateException(ProtocolErrorCode.AUTHENTICATION_FAILED,
+          "User has not yet authenticated");
+    }
+  }
+
+  @Override
+  public ConnectionStateProcessor authenticate(Properties properties)
+      throws AuthenticationFailedException {
+    Subject subject = securityService.login(properties);
+    return new ConnectionShiroAuthorizingStateProcessor(securityService, 
subject);
+  }
+}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ProtobufConnectionHandshakeStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ProtobufConnectionHandshakeStateProcessor.java
new file mode 100644
index 0000000000..6d240885ad
--- /dev/null
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/state/ProtobufConnectionHandshakeStateProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.state;
+
+import 
org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
+import 
org.apache.geode.internal.protocol.state.LegacySecurityConnectionStateProcessor;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import 
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
+import org.apache.geode.internal.protocol.OperationContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+import 
org.apache.geode.internal.protocol.protobuf.operations.HandshakeRequestOperationHandler;
+import org.apache.geode.internal.security.SecurityService;
+
+public class ProtobufConnectionHandshakeStateProcessor
+    implements ConnectionHandshakingStateProcessor {
+  private final SecurityService securityService;
+
+  public ProtobufConnectionHandshakeStateProcessor(SecurityService 
securityService) {
+    this.securityService = securityService;
+  }
+
+  @Override
+  public void validateOperation(MessageExecutionContext messageContext,
+      OperationContext operationContext) throws ConnectionStateException {
+    if (!(operationContext.getOperationHandler() instanceof 
HandshakeRequestOperationHandler)) {
+      throw new ConnectionStateException(ProtocolErrorCode.HANDSHAKE_REQUIRED,
+          "Protobuf handshake must be completed before any other operation.");
+    }
+  }
+
+  @Override
+  public ConnectionStateProcessor handshakeSucceeded() {
+    if (securityService.isIntegratedSecurity()) {
+      return new ConnectionShiroAuthenticatingStateProcessor(securityService);
+    } else if (securityService.isPeerSecurityRequired()
+        || securityService.isClientSecurityRequired()) {
+      return new LegacySecurityConnectionStateProcessor();
+    } else {
+      // Noop authenticator...no security
+      return new NoSecurityConnectionStateProcessor();
+    }
+  }
+}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufResponseUtilities.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufResponseUtilities.java
index 663c31ef9d..183642ff98 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufResponseUtilities.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufResponseUtilities.java
@@ -18,6 +18,7 @@
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
+import 
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
@@ -56,4 +57,8 @@
             
BasicTypes.Error.newBuilder().setErrorCode(errorCode.codeValue).setMessage(message))
         .build();
   }
+
+  public static ClientProtocol.ErrorResponse 
makeErrorResponse(ConnectionStateException exception) {
+    return makeErrorResponse(exception.getErrorCode(), exception.getMessage());
+  }
 }
diff --git a/geode-protobuf/src/main/proto/clientProtocol.proto 
b/geode-protobuf/src/main/proto/clientProtocol.proto
index 4d8b94aa05..c7ff967d5a 100644
--- a/geode-protobuf/src/main/proto/clientProtocol.proto
+++ b/geode-protobuf/src/main/proto/clientProtocol.proto
@@ -25,7 +25,7 @@ import "google/protobuf/any.proto";
 import "region_API.proto";
 import "server_API.proto";
 import "basicTypes.proto";
-import "authentication_API.proto";
+import "connection_API.proto";
 
 message Message {
     oneof messageType {
@@ -49,6 +49,7 @@ message Request {
         GetRegionRequest getRegionRequest = 44;
 
         AuthenticationRequest authenticationRequest = 100;
+        HandshakeRequest handshakeRequest = 101;
     }
 }
 
@@ -68,6 +69,7 @@ message Response {
         GetRegionResponse getRegionResponse = 44;
 
         AuthenticationResponse authenticationResponse = 100;
+        HandshakeResponse handshakeResponse = 101;
     }
 }
 
diff --git a/geode-protobuf/src/main/proto/authentication_API.proto 
b/geode-protobuf/src/main/proto/connection_API.proto
similarity index 64%
rename from geode-protobuf/src/main/proto/authentication_API.proto
rename to geode-protobuf/src/main/proto/connection_API.proto
index cecc71793b..c0f5071d69 100644
--- a/geode-protobuf/src/main/proto/authentication_API.proto
+++ b/geode-protobuf/src/main/proto/connection_API.proto
@@ -16,6 +16,26 @@
 syntax = "proto3";
 package org.apache.geode.internal.protocol.protobuf;
 
+enum MajorVersions {
+    INVALID_MAJOR_VERSION = 0;  // Protobuf requires 0 based enum
+    CURRENT_MAJOR_VERSION = 1;  // Initial message structure and handshake 
protocol
+}
+enum MinorVersions {
+    INVALID_MINOR_VERSION = 0;  // Protobuf requires 0 based enum
+    CURRENT_MINOR_VERSION = 1;  // Protobuf implementation at initial release
+}
+
+message HandshakeRequest {
+    int32 majorVersion = 1;
+    int32 minorVersion = 2;
+}
+
+message HandshakeResponse {
+    int32 serverMajorVersion = 1;
+    int32 serverMinorVersion = 2;
+    bool handshakePassed = 3;
+}
+
 message AuthenticationRequest {
     map<string,string> credentials = 1;
 }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
index dabfa4cd58..1e632ca607 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
@@ -14,35 +14,14 @@
  */
 package org.apache.geode.internal.protocol;
 
-import static 
org.apache.geode.internal.protocol.ProtocolErrorCode.AUTHENTICATION_FAILED;
-import static 
org.apache.geode.internal.protocol.ProtocolErrorCode.UNSUPPORTED_AUTHENTICATION_MODE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.management.internal.security.ResourceConstants;
@@ -50,6 +29,26 @@
 import org.apache.geode.security.ResourcePermission;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.geode.internal.protocol.ProtocolErrorCode.AUTHENTICATION_FAILED;
+import static 
org.apache.geode.internal.protocol.ProtocolErrorCode.UNSUPPORTED_AUTHENTICATION_MODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Security seems to have a few possible setups: * Manual SecurityManager set: 
integrated security *
@@ -78,7 +77,7 @@ public void setUp() {
     System.setProperty("geode.feature-protobuf-protocol", "true");
   }
 
-  public void setupCacheServerAndSocket() throws IOException {
+  public void setupCacheServerAndSocket() throws Exception {
     CacheServer cacheServer = cache.addCacheServer();
     int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
     cacheServer.setPort(cacheServerPort);
@@ -92,6 +91,15 @@ public void setupCacheServerAndSocket() throws IOException {
     outputStream.write(110);
 
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+    ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+        .build().writeDelimitedTo(outputStream);
+    ClientProtocol.Message handshakeResponse = 
protobufProtocolSerializer.deserialize(inputStream);
+    
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
   }
 
   private static class SimpleSecurityManager implements SecurityManager {
@@ -201,13 +209,13 @@ public void simpleAuthenticationSucceeds() throws 
Exception {
 
     ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
         .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()
+            
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
                 .putCredentials(ResourceConstants.USER_NAME, TEST_USERNAME)
                 .putCredentials(ResourceConstants.PASSWORD, TEST_PASSWORD)))
         .build();
     authenticationRequest.writeDelimitedTo(outputStream);
 
-    AuthenticationAPI.AuthenticationResponse authenticationResponse =
+    ConnectionAPI.AuthenticationResponse authenticationResponse =
         parseSimpleAuthenticationResponseFromInput();
     assertTrue(authenticationResponse.getAuthenticated());
 
@@ -227,14 +235,15 @@ public void simpleAuthenticationWithEmptyCreds() throws 
Exception {
     cache = createCacheWithSecurityManagerTakingExpectedCreds();
     setupCacheServerAndSocket();
 
-    ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()))
-        .build();
+    ClientProtocol.Message authenticationRequest =
+        ClientProtocol.Message.newBuilder()
+            .setRequest(ClientProtocol.Request.newBuilder()
+                
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()))
+            .build();
 
     authenticationRequest.writeDelimitedTo(outputStream);
 
-    AuthenticationAPI.AuthenticationResponse authenticationResponse =
+    ConnectionAPI.AuthenticationResponse authenticationResponse =
         parseSimpleAuthenticationResponseFromInput();
     assertFalse(authenticationResponse.getAuthenticated());
   }
@@ -246,20 +255,20 @@ public void simpleAuthenticationWithInvalidCreds() throws 
Exception {
 
     ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
         .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()
+            
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
                 .putCredentials(ResourceConstants.USER_NAME, TEST_USERNAME)
                 .putCredentials(ResourceConstants.PASSWORD, "wrong password")))
         .build();
 
     authenticationRequest.writeDelimitedTo(outputStream);
 
-    AuthenticationAPI.AuthenticationResponse authenticationResponse =
+    ConnectionAPI.AuthenticationResponse authenticationResponse =
         parseSimpleAuthenticationResponseFromInput();
     assertFalse(authenticationResponse.getAuthenticated());
   }
 
   @Test
-  public void noAuthenticatorSet() throws IOException {
+  public void noAuthenticatorSet() throws Exception {
     cache = createNoSecurityCache();
     setupCacheServerAndSocket();
 
@@ -274,10 +283,11 @@ public void legacyClientAuthenticatorSet() throws 
Exception {
     createLegacyAuthCache("security-client-authenticator");
     setupCacheServerAndSocket();
 
-    ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()))
-        .build();
+    ClientProtocol.Message authenticationRequest =
+        ClientProtocol.Message.newBuilder()
+            .setRequest(ClientProtocol.Request.newBuilder()
+                
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()))
+            .build();
 
     authenticationRequest.writeDelimitedTo(outputStream);
 
@@ -293,10 +303,11 @@ public void legacyPeerAuthenticatorSet() throws Exception 
{
     createLegacyAuthCache("security-peer-authenticator");
     setupCacheServerAndSocket();
 
-    ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()))
-        .build();
+    ClientProtocol.Message authenticationRequest =
+        ClientProtocol.Message.newBuilder()
+            .setRequest(ClientProtocol.Request.newBuilder()
+                
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()))
+            .build();
 
     authenticationRequest.writeDelimitedTo(outputStream);
 
@@ -323,7 +334,7 @@ private void createLegacyAuthCache(String 
authenticationProperty) {
     cache = cacheFactory.create();
   }
 
-  private AuthenticationAPI.AuthenticationResponse 
parseSimpleAuthenticationResponseFromInput()
+  private ConnectionAPI.AuthenticationResponse 
parseSimpleAuthenticationResponseFromInput()
       throws IOException {
     ClientProtocol.Message authenticationResponseMessage =
         ClientProtocol.Message.parseDelimitedFrom(inputStream);
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
index 109e10df44..8f2390b997 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
@@ -14,34 +14,13 @@
  */
 package org.apache.geode.internal.protocol;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
+import 
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -50,6 +29,27 @@
 import org.apache.geode.security.ResourcePermission;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @Category(IntegrationTest.class)
 public class AuthorizationIntegrationTest {
@@ -77,7 +77,7 @@
       new ResourcePermission(ResourcePermission.Resource.DATA, 
ResourcePermission.Operation.WRITE);
 
   @Before
-  public void setUp() throws IOException {
+  public void setUp() throws IOException, InvalidProtocolMessageException {
     Properties expectedAuthProperties = new Properties();
     expectedAuthProperties.setProperty(ResourceConstants.USER_NAME, 
TEST_USERNAME);
     expectedAuthProperties.setProperty(ResourceConstants.PASSWORD, 
TEST_PASSWORD);
@@ -113,9 +113,20 @@ public void setUp() throws IOException {
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
     when(mockSecurityManager.authorize(same(securityPrincipal), 
any())).thenReturn(false);
+
+    ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+        .build().writeDelimitedTo(outputStream);
+    ClientProtocol.Message handshakeResponse =
+        ClientProtocol.Message.parseDelimitedFrom(inputStream);;
+    
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
     ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
         .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()
+            
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
                 .putCredentials(ResourceConstants.USER_NAME, TEST_USERNAME)
                 .putCredentials(ResourceConstants.PASSWORD, TEST_PASSWORD)))
         .build();
@@ -126,7 +137,7 @@ public void setUp() throws IOException {
         responseMessage.getMessageTypeCase().getNumber());
     assertEquals(ClientProtocol.Response.AUTHENTICATIONRESPONSE_FIELD_NUMBER,
         responseMessage.getResponse().getResponseAPICase().getNumber());
-    AuthenticationAPI.AuthenticationResponse authenticationResponse =
+    ConnectionAPI.AuthenticationResponse authenticationResponse =
         responseMessage.getResponse().getAuthenticationResponse();
     assertTrue(authenticationResponse.getAuthenticated());
   }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/ProtobufTestExecutionContext.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/ProtobufTestExecutionContext.java
index fc7a0814bc..9b84144cd0 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/ProtobufTestExecutionContext.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/ProtobufTestExecutionContext.java
@@ -16,18 +16,17 @@
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.internal.InternalLocator;
-import org.apache.geode.internal.protocol.security.NoOpAuthenticator;
-import org.apache.geode.internal.protocol.security.NoOpAuthorizer;
-import 
org.apache.geode.internal.protocol.security.processors.NoAuthenticationSecurityProcessor;
+import 
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
 
 public class ProtobufTestExecutionContext {
   public static MessageExecutionContext getNoAuthCacheExecutionContext(Cache 
cache) {
-    return new MessageExecutionContext(cache, new NoOpAuthenticator(), new 
NoOpAuthorizer(),
-        new Object(), new NoOpStatistics(), new 
NoAuthenticationSecurityProcessor());
+    return new MessageExecutionContext(cache, new NoOpStatistics(),
+        new NoSecurityConnectionStateProcessor());
   }
 
   public static MessageExecutionContext 
getLocatorExecutionContext(InternalLocator locator) {
-    return new MessageExecutionContext(locator, new NoOpStatistics());
+    return new MessageExecutionContext(locator, new NoOpStatistics(),
+        new NoSecurityConnectionStateProcessor());
   }
 }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
index a81a173e05..4257911e0a 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
@@ -15,37 +15,6 @@
 
 package org.apache.geode.internal.protocol.acceptance;
 
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
 import org.apache.geode.Statistics;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
@@ -63,6 +32,7 @@
 import 
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -70,6 +40,36 @@
 import org.apache.geode.internal.protocol.serialization.SerializationService;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.apache.geode.util.test.TestUtil;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test that using the magic byte to indicate intend ot use ProtoBuf messages 
works
@@ -156,6 +156,17 @@ public void cleanup() throws IOException {
   @Test
   public void testBasicMessagesAndStats() throws Exception {
     ProtobufProtocolSerializer protobufProtocolSerializer = new 
ProtobufProtocolSerializer();
+
+    ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+        .build().writeDelimitedTo(outputStream);
+    ClientProtocol.Message handshakeResponse =
+        ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+    
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
     ClientProtocol.Message putMessage =
         MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, 
TEST_VALUE, TEST_REGION);
     protobufProtocolSerializer.serialize(putMessage, outputStream);
@@ -173,8 +184,8 @@ public void testBasicMessagesAndStats() throws Exception {
     assertEquals(1, protobufStats.length);
     Statistics statistics = protobufStats[0];
     assertEquals(1, statistics.get("currentClientConnections"));
-    assertEquals(2L, statistics.get("messagesReceived"));
-    assertEquals(2L, statistics.get("messagesSent"));
+    assertEquals(3L, statistics.get("messagesReceived"));
+    assertEquals(3L, statistics.get("messagesSent"));
     assertTrue(statistics.get("bytesReceived").longValue() > 0);
     assertTrue(statistics.get("bytesSent").longValue() > 0);
     assertEquals(1, statistics.get("clientConnectionStarts"));
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
index 565349a69e..a256e84f83 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
@@ -15,32 +15,6 @@
 
 package org.apache.geode.internal.protocol.acceptance;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.RegionFactory;
@@ -51,13 +25,39 @@
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.MessageUtil;
 import 
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test that using the magic byte to indicate intend ot use ProtoBuf messages 
works
@@ -189,6 +189,16 @@ private void validateSocketCreationAndDestruction(int 
cacheServerPort, int conne
           OutputStream outputStream = socket.getOutputStream();
           
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
+          ClientProtocol.Message.newBuilder()
+              .setRequest(ClientProtocol.Request.newBuilder()
+                  
.setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                      
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                      
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+              .build().writeDelimitedTo(outputStream);
+          ClientProtocol.Message handshakeResponse =
+              
ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+          
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
           ClientProtocol.Message putMessage = MessageUtil
               .makePutRequestMessage(serializationService, TEST_KEY, 
TEST_VALUE, TEST_REGION);
           protobufProtocolSerializer.serialize(putMessage, outputStream);
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
index 4653cdb43a..c81f0d6095 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
@@ -15,37 +15,6 @@
 
 package org.apache.geode.internal.protocol.acceptance;
 
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
-import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
@@ -62,6 +31,7 @@
 import 
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -72,6 +42,36 @@
 import 
org.apache.geode.internal.protocol.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.apache.geode.util.test.TestUtil;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test operations using ProtoBuf
@@ -139,6 +139,16 @@ public void setup() throws Exception {
     outputStream = socket.getOutputStream();
     
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
+    ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+                
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+                
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+        .build().writeDelimitedTo(outputStream);
+    ClientProtocol.Message handshakeResponse =
+        ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+    
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
     serializationService = new ProtobufSerializationService();
   }
 
@@ -153,11 +163,6 @@ public void cleanup() throws IOException {
   public void testNewProtocolWithMultikeyOperations() throws Exception {
     System.setProperty("geode.feature-protobuf-protocol", "true");
 
-    Socket socket = new Socket("localhost", cacheServerPort);
-    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-    OutputStream outputStream = socket.getOutputStream();
-    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
     ProtobufProtocolSerializer protobufProtocolSerializer = new 
ProtobufProtocolSerializer();
     Set<BasicTypes.Entry> putEntries = new HashSet<>();
     putEntries.add(ProtobufUtilities.createEntry(serializationService, 
TEST_MULTIOP_KEY1,
@@ -193,11 +198,6 @@ public void 
multiKeyOperationErrorsWithClasscastException() throws Exception {
     regionFactory.create(regionName);
     System.setProperty("geode.feature-protobuf-protocol", "true");
 
-    Socket socket = new Socket("localhost", cacheServerPort);
-    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-    OutputStream outputStream = socket.getOutputStream();
-    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
     ProtobufProtocolSerializer protobufProtocolSerializer = new 
ProtobufProtocolSerializer();
     Set<BasicTypes.Entry> putEntries = new HashSet<>();
     putEntries.add(ProtobufUtilities.createEntry(serializationService, 2.2f, 
TEST_MULTIOP_VALUE1));
@@ -261,11 +261,6 @@ public void testNewProtocolGetRegionNamesCallSucceeds() 
throws Exception {
   public void testNewProtocolGetRegionCall() throws Exception {
     System.setProperty("geode.feature-protobuf-protocol", "true");
 
-    Socket socket = new Socket("localhost", cacheServerPort);
-    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-    OutputStream outputStream = socket.getOutputStream();
-    
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
     ProtobufProtocolSerializer protobufProtocolSerializer = new 
ProtobufProtocolSerializer();
     ClientProtocol.Message getRegionMessage = 
MessageUtil.makeGetRegionRequestMessage(TEST_REGION);
     protobufProtocolSerializer.serialize(getRegionMessage, outputStream);
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufShiroAuthenticatorJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufShiroAuthenticatorJUnitTest.java
deleted file mode 100644
index 1dfdfd7f75..0000000000
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufShiroAuthenticatorJUnitTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.protocol.protobuf;
-
-import static junit.framework.TestCase.fail;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.shiro.subject.Subject;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import 
org.apache.geode.internal.protocol.protobuf.security.ProtobufShiroAuthenticator;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.management.internal.security.ResourceConstants;
-import org.apache.geode.security.AuthenticationFailedException;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ProtobufShiroAuthenticatorJUnitTest {
-  private static final String TEST_USERNAME = "user1";
-  private static final String TEST_PASSWORD = "hunter2";
-  // initialized with an incoming request in setUp.
-  private ByteArrayInputStream byteArrayInputStream;
-  private ByteArrayOutputStream byteArrayOutputStream;
-  private ProtobufShiroAuthenticator protobufShiroAuthenticator;
-  private SecurityService mockSecurityService;
-  private Subject mockSecuritySubject;
-  private Properties expectedAuthProperties;
-
-  @Before
-  public void setUp() throws IOException {
-    ClientProtocol.Message basicAuthenticationRequest = 
ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            
.setAuthenticationRequest(AuthenticationAPI.AuthenticationRequest.newBuilder()
-                .putCredentials(ResourceConstants.USER_NAME, TEST_USERNAME)
-                .putCredentials(ResourceConstants.PASSWORD, TEST_PASSWORD)))
-        .build();
-
-    expectedAuthProperties = new Properties();
-    expectedAuthProperties.setProperty(ResourceConstants.USER_NAME, 
TEST_USERNAME);
-    expectedAuthProperties.setProperty(ResourceConstants.PASSWORD, 
TEST_PASSWORD);
-
-    ByteArrayOutputStream messageStream = new ByteArrayOutputStream();
-    basicAuthenticationRequest.writeDelimitedTo(messageStream);
-    byteArrayInputStream = new 
ByteArrayInputStream(messageStream.toByteArray());
-    byteArrayOutputStream = new ByteArrayOutputStream();
-
-    mockSecuritySubject = mock(Subject.class);
-    mockSecurityService = mock(SecurityService.class);
-    
when(mockSecurityService.login(expectedAuthProperties)).thenReturn(mockSecuritySubject);
-
-    protobufShiroAuthenticator = new 
ProtobufShiroAuthenticator(mockSecurityService);
-  }
-
-  @Test
-  public void successfulAuthentication() throws IOException {
-
-    Properties properties = new Properties();
-    properties.setProperty(ResourceConstants.USER_NAME, TEST_USERNAME);
-    properties.setProperty(ResourceConstants.PASSWORD, TEST_PASSWORD);
-
-    Subject authenticate = protobufShiroAuthenticator.authenticate(properties);
-
-    assertNotNull(authenticate);
-  }
-
-  @Test(expected = AuthenticationFailedException.class)
-  public void failedAuthentication() throws IOException {
-    when(mockSecurityService.login(expectedAuthProperties))
-        .thenThrow(new AuthenticationFailedException("BOOM!"));
-
-    Properties properties = new Properties();
-    properties.setProperty(ResourceConstants.USER_NAME, TEST_USERNAME);
-    properties.setProperty(ResourceConstants.PASSWORD, TEST_PASSWORD);
-
-    protobufShiroAuthenticator.authenticate(properties);
-  }
-
-  @Test
-  public void authenticationRequestedWithNoCacheSecurity() throws IOException {
-    when(mockSecurityService.isIntegratedSecurity()).thenReturn(false);
-    when(mockSecurityService.isClientSecurityRequired()).thenReturn(false);
-    when(mockSecurityService.isPeerSecurityRequired()).thenReturn(false);
-
-    Properties properties = new Properties();
-    properties.setProperty(ResourceConstants.USER_NAME, TEST_USERNAME);
-    properties.setProperty(ResourceConstants.PASSWORD, TEST_PASSWORD);
-
-    Subject authenticate = protobufShiroAuthenticator.authenticate(properties);
-
-    assertNotNull(authenticate);
-  }
-}
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandlerJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandlerJUnitTest.java
new file mode 100644
index 0000000000..934ae39ad1
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/HandshakeRequestOperationHandlerJUnitTest.java
@@ -0,0 +1,171 @@
+package org.apache.geode.internal.protocol.protobuf.operations;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import 
org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import 
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.Result;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.state.ConnectionShiroAuthenticatingStateProcessor;
+import 
org.apache.geode.internal.protocol.protobuf.state.ProtobufConnectionHandshakeStateProcessor;
+import 
org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.serialization.SerializationService;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.shiro.subject.Subject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+@Category(UnitTest.class)
+public class HandshakeRequestOperationHandlerJUnitTest {
+  private static final int INVALID_MAJOR_VERSION = 67;
+  private static final int INVALID_MINOR_VERSION = 92347;
+
+  private HandshakeRequestOperationHandler handshakeHandler =
+      new HandshakeRequestOperationHandler();
+  private SerializationService serializationService = new 
ProtobufSerializationService();
+  private ProtobufConnectionHandshakeStateProcessor handshakeStateProcessor;
+
+  @Before
+  public void Setup() {
+    handshakeStateProcessor =
+        new 
ProtobufConnectionHandshakeStateProcessor(mock(SecurityService.class));
+  }
+
+  @Test
+  public void testCurrentVersionHandshakeSucceeds() throws Exception {
+    ConnectionAPI.HandshakeRequest handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    MessageExecutionContext messageExecutionContext =
+        new MessageExecutionContext(mock(InternalCache.class), null, 
handshakeStateProcessor);
+    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
result =
+        handshakeHandler.process(serializationService, handshakeRequest, 
messageExecutionContext);
+    ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
+    assertTrue(handshakeResponse.getHandshakePassed());
+    assertEquals(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+        handshakeResponse.getServerMajorVersion());
+    assertEquals(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE,
+        handshakeResponse.getServerMinorVersion());
+  }
+
+  @Test
+  public void testInvalidMajorVersionFails() throws Exception {
+    assertNotEquals(INVALID_MAJOR_VERSION, 
ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+    ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
+        INVALID_MAJOR_VERSION, 
ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    MessageExecutionContext messageExecutionContext =
+        new MessageExecutionContext(mock(InternalCache.class), null, 
handshakeStateProcessor);
+
+    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
+
+    // Also validate the protobuf INVALID_MAJOR_VERSION_VALUE constant fails
+    handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
+  }
+
+  private void verifyHandshakeFails(ConnectionAPI.HandshakeRequest 
handshakeRequest,
+      MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
+    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
result =
+        handshakeHandler.process(serializationService, handshakeRequest, 
messageExecutionContext);
+    ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
+    assertFalse(handshakeResponse.getHandshakePassed());
+  }
+
+  @Test
+  public void testInvalidMinorVersionFails() throws Exception {
+    assertNotEquals(INVALID_MINOR_VERSION, 
ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+    ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
+        ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, 
INVALID_MINOR_VERSION);
+    MessageExecutionContext messageExecutionContext =
+        new MessageExecutionContext(mock(InternalCache.class), null, 
handshakeStateProcessor);
+
+    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
+
+    // Also validate the protobuf INVALID_MINOR_VERSION_VALUE constant fails
+    handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE);
+    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
+  }
+
+  @Test
+  public void testNoSecurityStateFailsHandshake() throws Exception {
+    ConnectionAPI.HandshakeRequest handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    MessageExecutionContext messageExecutionContext = new 
MessageExecutionContext(
+        mock(InternalCache.class), null, new 
NoSecurityConnectionStateProcessor());
+
+    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
result =
+        handshakeHandler.process(serializationService, handshakeRequest, 
messageExecutionContext);
+    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
+    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+        errorMessage.getError().getErrorCode());
+  }
+
+  @Test
+  public void testAuthenticatingStateFailsHandshake() throws Exception {
+    ConnectionAPI.HandshakeRequest handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    MessageExecutionContext messageExecutionContext =
+        new MessageExecutionContext(mock(InternalCache.class), null,
+            new 
ConnectionShiroAuthenticatingStateProcessor(mock(SecurityService.class)));
+
+    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
result =
+        handshakeHandler.process(serializationService, handshakeRequest, 
messageExecutionContext);
+    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
+    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+        errorMessage.getError().getErrorCode());
+  }
+
+  @Test
+  public void testAuthorizingStateFailsHandshake() throws Exception {
+    ConnectionAPI.HandshakeRequest handshakeRequest =
+        
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    MessageExecutionContext messageExecutionContext =
+        new MessageExecutionContext(mock(InternalCache.class), null,
+            new 
ConnectionShiroAuthorizingStateProcessor(mock(SecurityService.class),
+                mock(Subject.class)));
+
+    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> 
result =
+        handshakeHandler.process(serializationService, handshakeRequest, 
messageExecutionContext);
+    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
+    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+        errorMessage.getError().getErrorCode());
+  }
+
+  private ConnectionAPI.HandshakeRequest generateHandshakeRequest(int 
minorVersion,
+      int majorVersion) {
+    return 
ConnectionAPI.HandshakeRequest.newBuilder().setMajorVersion(majorVersion)
+        .setMinorVersion(minorVersion).build();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Create HandshakeRequest
> -----------------------
>
>                 Key: GEODE-3894
>                 URL: https://issues.apache.org/jira/browse/GEODE-3894
>             Project: Geode
>          Issue Type: Improvement
>          Components: client/server
>            Reporter: Brian Baynes
>
> Make Handshake a HandshakeRequest that is contained in a Message the same way 
> other requests are handled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to