[
https://issues.apache.org/jira/browse/GEODE-4059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283920#comment-16283920
]
ASF GitHub Bot commented on GEODE-4059:
---------------------------------------
bschuchardt closed pull request #1137: GEODE-4059: Changing protobuf handshake
to not need communication mod…
URL: https://github.com/apache/geode/pull/1137
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
index e0d18b3924..b55524f94e 100644
---
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
+++
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
@@ -14,6 +14,10 @@
*/
package org.apache.geode.internal.protocol.state;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.geode.internal.protocol.MessageExecutionContext;
import org.apache.geode.internal.protocol.OperationContext;
import org.apache.geode.internal.protocol.ProtocolErrorCode;
@@ -48,15 +52,14 @@ default ConnectionAuthenticatingStateProcessor
allowAuthentication()
}
/**
- * This indicates whether this specific state processor is able to handle
handshake requests.
+ * Allow the state processor to take over the entire processing of a given
message.
*
- * @return specialized ConnectionHandshakingStateProcessor interface
implementation which can move
- * to a new state
- * @throws ConnectionStateException if unable to handle handshakes in this
state.
+ * @return - True if the message has been handled by the state processor,
false to continue normal
+ * processing.
*/
- default ConnectionHandshakingStateProcessor allowHandshake() throws
ConnectionStateException {
- throw new ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_OPERATION,
- "Requested operation not allowed at this time");
+ default boolean handleMessageIndependently(InputStream inputStream,
OutputStream outputStream,
+ MessageExecutionContext executionContext) throws IOException {
+ return false;
}
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index aadbfb40bf..4e002d8c7f 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -44,6 +44,7 @@
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.cache.UnsupportedVersionException;
import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
@@ -364,7 +365,6 @@ private void processRequest(final Socket socket) {
executor.execute(() -> {
long startTime = DistributionStats.getStatTime();
DataInputStream input = null;
- Object request, response;
try {
socket.setSoTimeout(READ_TIMEOUT);
@@ -379,89 +379,20 @@ private void processRequest(final Socket socket) {
+ (socket.getInetAddress().getHostAddress() + ":" +
socket.getPort()), e);
return;
}
- int gossipVersion = readGossipVersion(socket, input);
-
- short versionOrdinal;
- if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
- if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
- && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
- try {
- int protocolVersion = input.readUnsignedByte();
- ClientProtocolService clientProtocolService =
- clientProtocolServiceLoader.lookupService(protocolVersion);
- clientProtocolService.initializeStatistics("LocatorStats",
- internalLocator.getDistributedSystem());
- try (ClientProtocolProcessor pipeline =
-
clientProtocolService.createProcessorForLocator(internalLocator)) {
- pipeline.processMessage(input, socket.getOutputStream());
- } catch (IncompatibleVersionException e) {
- // should not happen on the locator as there is no handshake.
- log.error("Unexpected exception in client message processing",
e);
- }
- } catch (ServiceLoadingFailureException e) {
- log.error("There was an error looking up the client protocol
service", e);
- socket.close();
- throw new IOException("There was an error looking up the client
protocol service", e);
- } catch (ServiceVersionNotFoundException e) {
- log.error("Unable to find service matching the client protocol
version byte", e);
- socket.close();
- throw new IOException(
- "Unable to find service matching the client protocol version
byte", e);
- }
- } else {
- rejectUnknownProtocolConnection(socket, gossipVersion);
- }
- } else if (gossipVersion <= getCurrentGossipVersion()
- && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
- // Create a versioned stream to remember sender's GemFire version
- versionOrdinal = (short)
GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
-
- if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
- // Recent versions of TcpClient will send the version ordinal
- versionOrdinal = input.readShort();
- }
-
- if (log.isDebugEnabled() && versionOrdinal !=
Version.CURRENT_ORDINAL) {
- log.debug("Locator reading request from " +
socket.getInetAddress() + " with version "
- + Version.fromOrdinal(versionOrdinal, false));
- }
- input = new VersionedDataInputStream(input,
Version.fromOrdinal(versionOrdinal, false));
- request = DataSerializer.readObject(input);
- if (log.isDebugEnabled()) {
- log.debug("Locator received request " + request + " from " +
socket.getInetAddress());
- }
- if (request instanceof ShutdownRequest) {
- shuttingDown = true;
- // Don't call shutdown from within the worker thread, see java bug
#6576792.
- // Closing the socket will cause our acceptor thread to shutdown
the executor
- this.serverSocketPortAtClose = srv_sock.getLocalPort();
- srv_sock.close();
- response = new ShutdownResponse();
- } else if (request instanceof InfoRequest) {
- response = handleInfoRequest(request);
- } else if (request instanceof VersionRequest) {
- response = handleVersionRequest(request);
- } else {
- response = handler.processRequest(request);
- }
-
- handler.endRequest(request, startTime);
-
- startTime = DistributionStats.getStatTime();
- if (response != null) {
- DataOutputStream output = new
DataOutputStream(socket.getOutputStream());
- if (versionOrdinal != Version.CURRENT_ORDINAL) {
- output =
- new VersionedDataOutputStream(output,
Version.fromOrdinal(versionOrdinal, false));
- }
- DataSerializer.writeObject(response, output);
- output.flush();
- }
+ // read the first byte & check for an improperly configured client
pool trying
+ // to contact a cache server
+ int firstByte = input.readUnsignedByte();
+ if (firstByte == CommunicationMode.ReservedForGossip.getModeNumber()) {
+ processOneConnection(socket, startTime, input);
+ } else if (firstByte ==
CommunicationMode.ProtobufClientServerProtocol.getModeNumber()) {
+ handleProtobufConnection(socket, input);
+ } else if (CommunicationMode.isValidMode(firstByte)) {
+ socket.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR);
+ throw new Exception("Improperly configured client detected - use
addPoolLocator to "
+ + "configure its locators instead of addPoolServer.");
- handler.endResponse(request, startTime);
} else {
- // Close the socket. We can not accept requests from a newer version
- rejectUnknownProtocolConnection(socket, gossipVersion);
+ rejectUnknownProtocolConnection(socket, firstByte);
}
} catch (EOFException ignore) {
// client went away - ignore
@@ -516,32 +447,109 @@ private void processRequest(final Socket socket) {
});
}
- private void rejectUnknownProtocolConnection(Socket socket, int
gossipVersion)
- throws IOException {
+ private void processOneConnection(Socket socket, long startTime,
DataInputStream input)
+ throws IOException, UnsupportedVersionException, ClassNotFoundException {
+ // At this point we've read the leading byte of the gossip version and
found it to be 0,
+ // continue reading the next three bytes
+ int gossipVersion = 0;
+ for (int i = 0; i < 3; i++) {
+ gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte());
+ }
+
+ Object request;
+ Object response;
+ short versionOrdinal;
+ if (gossipVersion <= getCurrentGossipVersion()
+ && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
+ // Create a versioned stream to remember sender's GemFire version
+ versionOrdinal = (short)
GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
+
+ if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
+ // Recent versions of TcpClient will send the version ordinal
+ versionOrdinal = input.readShort();
+ }
+
+ if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
+ log.debug("Locator reading request from " + socket.getInetAddress() +
" with version "
+ + Version.fromOrdinal(versionOrdinal, false));
+ }
+ input = new VersionedDataInputStream(input,
Version.fromOrdinal(versionOrdinal, false));
+ request = DataSerializer.readObject(input);
+ if (log.isDebugEnabled()) {
+ log.debug("Locator received request " + request + " from " +
socket.getInetAddress());
+ }
+ if (request instanceof ShutdownRequest) {
+ shuttingDown = true;
+ // Don't call shutdown from within the worker thread, see java bug
#6576792.
+ // Closing the socket will cause our acceptor thread to shutdown the
executor
+ this.serverSocketPortAtClose = srv_sock.getLocalPort();
+ srv_sock.close();
+ response = new ShutdownResponse();
+ } else if (request instanceof InfoRequest) {
+ response = handleInfoRequest(request);
+ } else if (request instanceof VersionRequest) {
+ response = handleVersionRequest(request);
+ } else {
+ response = handler.processRequest(request);
+ }
+
+ handler.endRequest(request, startTime);
+
+ startTime = DistributionStats.getStatTime();
+ if (response != null) {
+ DataOutputStream output = new
DataOutputStream(socket.getOutputStream());
+ if (versionOrdinal != Version.CURRENT_ORDINAL) {
+ output =
+ new VersionedDataOutputStream(output,
Version.fromOrdinal(versionOrdinal, false));
+ }
+ DataSerializer.writeObject(response, output);
+ output.flush();
+ }
+
+ handler.endResponse(request, startTime);
+ } else {
+ // Close the socket. We can not accept requests from a newer version
+ rejectUnknownProtocolConnection(socket, gossipVersion);
+ }
+ }
+
+ private void rejectUnknownProtocolConnection(Socket socket, int
gossipVersion) {
try {
socket.getOutputStream().write("unknown protocol version".getBytes());
socket.getOutputStream().flush();
+ socket.close();
} catch (IOException e) {
log.debug("exception in sending reply to process using unknown protocol
" + gossipVersion, e);
}
- socket.close();
}
- private int readGossipVersion(Socket sock, DataInputStream input) throws
Exception {
- // read the first byte & check for an improperly configured client pool
trying
- // to contact a cache server
- int firstByte = input.readUnsignedByte();
- if (CommunicationMode.isValidMode(firstByte)) {
- sock.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR);
- throw new Exception("Improperly configured client detected - use
addPoolLocator to "
- + "configure its locators instead of addPoolServer.");
+ private void handleProtobufConnection(Socket socket, DataInputStream input)
throws Exception {
+ if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
+ log.warn("Incoming protobuf connection, but protobuf not enabled on this
locator.");
+ socket.close();
+ return;
}
- int gossipVersion = firstByte;
- for (int i = 0; i < 3; i++) {
- gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte());
+ try {
+ ClientProtocolService clientProtocolService =
clientProtocolServiceLoader.lookupService();
+ clientProtocolService.initializeStatistics("LocatorStats",
+ internalLocator.getDistributedSystem());
+ try (ClientProtocolProcessor pipeline =
+ clientProtocolService.createProcessorForLocator(internalLocator)) {
+ pipeline.processMessage(input, socket.getOutputStream());
+ } catch (IncompatibleVersionException e) {
+ // should not happen on the locator as there is no handshake.
+ log.error("Unexpected exception in client message processing", e);
+ }
+ } catch (ServiceLoadingFailureException e) {
+ log.error("There was an error looking up the client protocol service",
e);
+ socket.close();
+ throw new IOException("There was an error looking up the client protocol
service", e);
+ } catch (ServiceVersionNotFoundException e) {
+ log.error("Unable to find service matching the client protocol version
byte", e);
+ socket.close();
+ throw new IOException("Unable to find service matching the client
protocol version byte", e);
}
- return gossipVersion;
}
protected Object handleInfoRequest(Object request) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
index 4b66062ff0..bf85bbcc00 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
@@ -39,7 +39,7 @@ public ClientProtocolServiceLoader() {
return resultList;
}
- public ClientProtocolService lookupService(int protocolVersion) {
+ public ClientProtocolService lookupService() {
if (clientProtocolServices.isEmpty()) {
throw new ServiceLoadingFailureException(
"There is no ClientProtocolService implementation found in JVM");
@@ -50,10 +50,6 @@ public ClientProtocolService lookupService(int
protocolVersion) {
"There is more than one ClientProtocolService implementation found
in JVM; aborting");
}
ClientProtocolService clientProtocolService =
clientProtocolServices.get(0);
- if (clientProtocolService.getServiceProtocolVersion() != protocolVersion) {
- throw new ServiceVersionNotFoundException(
- "The ClientProtocolService doesn't match the requested version.");
- }
return clientProtocolService;
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
index 053a556b58..e57b850723 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
@@ -22,6 +22,19 @@
* query methods must be updated.
*/
public enum CommunicationMode {
+ /**
+ * The first byte of any locator connection will be the high order byte of
its gossip version,
+ * which will always be 0. Communication modes should not collide with this
value.
+ */
+ ReservedForGossip((byte) 0, "Locator gossip version"),
+ /**
+ * For the new client-server protocol.
+ *
+ * Protobuf handshake messages are specially constructed so that this value
will match the first
+ * byte sent, allowing clients to start protobuf connections with a
handshake instead of
+ * communication mode bytes.
+ */
+ ProtobufClientServerProtocol((byte) 10, "Protobuf client"),
/**
* Byte meaning that the Socket is being used for 'client to server'
communication.
*/
@@ -56,11 +69,7 @@
* Byte meaning that the Socket is being used for 'client to server'
messages related to a client
* queue (register interest, create cq, etc.).
*/
- ClientToServerForQueue((byte) 107, "clientToServerForQueue"),
- /**
- * For the new client-server protocol, which ignores the usual handshake
mechanism.
- */
- ProtobufClientServerProtocol((byte) 110, "Protobuf client");
+ ClientToServerForQueue((byte) 107, "clientToServerForQueue");
/**
* is this a client-initiated operations connection?
@@ -128,11 +137,13 @@ public byte getModeNumber() {
* check the given mode to see if it is assigned to one of the enumeration's
instances
*/
public static boolean isValidMode(int mode) {
- return 100 <= mode && mode <= 110;
+ return (100 <= mode && mode <= 107) || mode == 10;
}
public static CommunicationMode fromModeNumber(byte modeNumber) {
switch (modeNumber) {
+ case 10:
+ return ProtobufClientServerProtocol;
case 100:
return ClientToServer;
case 101:
@@ -149,8 +160,6 @@ public static CommunicationMode fromModeNumber(byte
modeNumber) {
return UnsuccessfulServerToClient;
case 107:
return ClientToServerForQueue;
- case 110:
- return ProtobufClientServerProtocol;
default:
throw new IllegalArgumentException("unknown communications mode: " +
modeNumber);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 7070b3799c..59d4533e10 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -44,9 +44,9 @@ public ServerConnectionFactory() {
private synchronized ClientProtocolService getClientProtocolService(
- StatisticsFactory statisticsFactory, String serverName, int
protocolVersion) {
+ StatisticsFactory statisticsFactory, String serverName) {
if (clientProtocolService == null) {
- clientProtocolService =
clientProtocolServiceLoader.lookupService(protocolVersion);
+ clientProtocolService = clientProtocolServiceLoader.lookupService();
clientProtocolService.initializeStatistics(serverName,
statisticsFactory);
}
return clientProtocolService;
@@ -60,11 +60,9 @@ public ServerConnection makeServerConnection(Socket socket,
InternalCache cache,
if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
throw new IOException("Server received unknown communication mode: " +
communicationMode);
} else {
- int protocolVersion = readProtocolVersionByte(socket);
try {
return createGenericProtocolServerConnection(socket, cache, helper,
stats, hsTimeout,
- socketBufferSize, communicationModeStr, communicationMode,
acceptor, securityService,
- protocolVersion);
+ socketBufferSize, communicationModeStr, communicationMode,
acceptor, securityService);
} catch (ServiceLoadingFailureException ex) {
throw new IOException("Could not load protobuf client protocol", ex);
} catch (ServiceVersionNotFoundException ex) {
@@ -77,16 +75,12 @@ public ServerConnection makeServerConnection(Socket socket,
InternalCache cache,
}
}
- private int readProtocolVersionByte(Socket socket) throws IOException {
- return socket.getInputStream().read();
- }
-
private ServerConnection createGenericProtocolServerConnection(Socket
socket, InternalCache cache,
CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int
socketBufferSize,
String communicationModeStr, byte communicationMode, Acceptor acceptor,
- SecurityService securityService, int protocolVersion) {
- ClientProtocolService service =
getClientProtocolService(cache.getDistributedSystem(),
- acceptor.getServerName(), protocolVersion);
+ SecurityService securityService) {
+ ClientProtocolService service =
+ getClientProtocolService(cache.getDistributedSystem(),
acceptor.getServerName());
ClientProtocolProcessor processor = service.createProcessorForCache(cache,
securityService);
diff --git
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
b/geode-protobuf-messages/src/main/proto/handshake.proto
similarity index 50%
rename from
geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
rename to geode-protobuf-messages/src/main/proto/handshake.proto
index f885bfc16b..b3eebf679b 100644
---
a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
+++ b/geode-protobuf-messages/src/main/proto/handshake.proto
@@ -1,7 +1,7 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
* agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
*
@@ -12,17 +12,26 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-package org.apache.geode.internal.protocol.state;
-import
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+syntax = "proto3";
+package org.apache.geode.internal.protocol.protobuf;
-public interface ConnectionHandshakingStateProcessor extends
ConnectionStateProcessor {
- @Override
- default ConnectionHandshakingStateProcessor allowHandshake() throws
ConnectionStateException {
- return this;
- }
+enum MajorVersions {
+ INVALID_MAJOR_VERSION = 0; // Protobuf requires 0 based enum
+ CURRENT_MAJOR_VERSION = 1; // Initial message structure and handshake
protocol
+}
+enum MinorVersions {
+ INVALID_MINOR_VERSION = 0; // Protobuf requires 0 based enum
+ CURRENT_MINOR_VERSION = 1; // Protobuf implementation at initial release
+}
+
+message NewConnectionHandshake {
+ fixed32 majorVersion = 1;
+ fixed32 minorVersion = 2;
+}
- // This is called when a handshake operation succeeds to get the processor
for the next connection
- // state.
- ConnectionStateProcessor handshakeSucceeded();
+message HandshakeAcknowledgement {
+ int32 serverMajorVersion = 1;
+ int32 serverMinorVersion = 2;
+ bool handshakePassed = 3;
}
diff --git a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
index 4fd0c96c0d..d6be91b232 100644
--- a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
@@ -22,9 +22,6 @@
syntax = "proto3";
package org.apache.geode.internal.protocol.protobuf.v1;
-import "google/protobuf/any.proto";
-import "google/protobuf/empty.proto";
-
message Entry {
EncodedValue key = 1;
EncodedValue value = 2;
diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 820c03d740..091536dc18 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -21,7 +21,6 @@
syntax = "proto3";
package org.apache.geode.internal.protocol.protobuf.v1;
-import "google/protobuf/any.proto";
import "v1/region_API.proto";
import "v1/locator_API.proto";
import "v1/basicTypes.proto";
@@ -48,7 +47,6 @@ message Request {
GetRegionRequest getRegionRequest = 42;
AuthenticationRequest authenticationRequest = 100;
- HandshakeRequest handshakeRequest = 101;
}
}
@@ -66,7 +64,6 @@ message Response {
GetRegionResponse getRegionResponse = 42;
AuthenticationResponse authenticationResponse = 100;
- HandshakeResponse handshakeResponse = 101;
ErrorResponse errorResponse = 1000;
}
diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
index 176dc0d6f4..7c4435e126 100644
--- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
@@ -16,26 +16,6 @@
syntax = "proto3";
package org.apache.geode.internal.protocol.protobuf.v1;
-enum MajorVersions {
- INVALID_MAJOR_VERSION = 0; // Protobuf requires 0 based enum
- CURRENT_MAJOR_VERSION = 1; // Initial message structure and handshake
protocol
-}
-enum MinorVersions {
- INVALID_MINOR_VERSION = 0; // Protobuf requires 0 based enum
- CURRENT_MINOR_VERSION = 1; // Protobuf implementation at initial release
-}
-
-message HandshakeRequest {
- int32 majorVersion = 1;
- int32 minorVersion = 2;
-}
-
-message HandshakeResponse {
- int32 serverMajorVersion = 1;
- int32 serverMinorVersion = 2;
- bool handshakePassed = 3;
-}
-
message AuthenticationRequest {
map<string,string> credentials = 1;
}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
index d67897f978..0a30ae4d79 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
@@ -18,12 +18,16 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PushbackInputStream;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import
org.apache.geode.internal.protocol.protobuf.v1.operations.VersionValidator;
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
import
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
@@ -34,6 +38,7 @@
private final InternalLocator locator;
private final ProtobufStreamProcessor streamProcessor;
private final ConnectionStateProcessor locatorConnectionState;
+ private final VersionValidator validator;
ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
ProtocolClientStatistics statistics, InternalLocator locator) {
@@ -42,11 +47,13 @@
this.locator = locator;
this.statistics.clientConnected();
this.locatorConnectionState = new NoSecurityConnectionStateProcessor();
+ this.validator = new VersionValidator();
}
@Override
public void processMessage(InputStream inputStream, OutputStream
outputStream)
throws IOException, IncompatibleVersionException {
+ handleHandshakeMessage(inputStream);
streamProcessor.receiveMessage(inputStream, outputStream,
new MessageExecutionContext(locator, statistics,
locatorConnectionState));
}
@@ -61,4 +68,20 @@ public boolean socketProcessingIsFinished() {
// All locator connections are closed after one message, so this is not
used
return false;
}
+
+ private void handleHandshakeMessage(InputStream inputStream) throws
IOException {
+ // Incoming connection had the first byte removed to determine
communication mode, add that
+ // back before parsing.
+ PushbackInputStream handshakeStream = new PushbackInputStream(inputStream);
+
handshakeStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+ Handshake.NewConnectionHandshake handshakeRequest =
+ Handshake.NewConnectionHandshake.parseDelimitedFrom(handshakeStream);
+ int majorVersion = handshakeRequest.getMajorVersion();
+ int minorVersion = handshakeRequest.getMinorVersion();
+ if (!validator.isValid(majorVersion, minorVersion)) {
+ throw new IOException(
+ "Invalid protobuf client version number: " + majorVersion + "." +
minorVersion);
+ }
+ }
}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
index 63660a944e..23fc8045a9 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
@@ -19,6 +19,7 @@
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
import
org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
@@ -62,6 +63,6 @@ public ClientProtocolProcessor
createProcessorForLocator(InternalLocator locator
@Override
public int getServiceProtocolVersion() {
- return ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
+ return Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
}
}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index 543454ffc7..be1c276189 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -61,6 +61,11 @@ public void receiveMessage(InputStream inputStream,
OutputStream outputStream,
private void processOneMessage(InputStream inputStream, OutputStream
outputStream,
MessageExecutionContext executionContext)
throws InvalidProtocolMessageException, IOException {
+ if
(executionContext.getConnectionStateProcessor().handleMessageIndependently(inputStream,
+ outputStream, executionContext)) {
+ return;
+ }
+
ClientProtocol.Message message =
protobufProtocolSerializer.deserialize(inputStream);
if (message == null) {
String errorMessage = "Tried to deserialize protobuf message at EOF";
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java
new file mode 100644
index 0000000000..01321969ff
--- /dev/null
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
+
+public class HandshakeHandler {
+ private static final Logger logger = LogManager.getLogger();
+ private static final VersionValidator validator = new VersionValidator();
+
+ public static boolean handleHandshake(InputStream inputStream, OutputStream
outputStream,
+ ProtocolClientStatistics statistics) throws IOException {
+ Handshake.NewConnectionHandshake handshakeRequest =
+ Handshake.NewConnectionHandshake.parseDelimitedFrom(inputStream);
+
+ statistics.messageReceived(handshakeRequest.getSerializedSize());
+
+ final boolean handshakeSucceeded =
+ validator.isValid(handshakeRequest.getMajorVersion(),
handshakeRequest.getMinorVersion());
+
+ Handshake.HandshakeAcknowledgement handshakeResponse =
Handshake.HandshakeAcknowledgement
+
.newBuilder().setServerMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+
.setServerMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE)
+ .setHandshakePassed(handshakeSucceeded).build();
+
+ handshakeResponse.writeDelimitedTo(outputStream);
+ statistics.messageSent(handshakeResponse.getSerializedSize());
+ if (!handshakeSucceeded) {
+ throw new IOException("Incompatible protobuf version.");
+ }
+
+ return handshakeSucceeded;
+ }
+}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java
deleted file mode 100644
index 97338e6d8f..0000000000
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.v1.operations;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.Failure;
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.Result;
-import org.apache.geode.internal.protocol.Success;
-import org.apache.geode.internal.protocol.operations.OperationHandler;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
-import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities;
-import org.apache.geode.internal.protocol.serialization.SerializationService;
-import
org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
-import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
-import
org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;
-import
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
-
-public class HandshakeRequestOperationHandler implements
- OperationHandler<ConnectionAPI.HandshakeRequest,
ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> {
- private static final Logger logger = LogManager.getLogger();
- private final VersionValidator validator = new VersionValidator();
-
- @Override
- public Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse>
process(
- SerializationService serializationService,
ConnectionAPI.HandshakeRequest request,
- MessageExecutionContext messageExecutionContext)
- throws InvalidExecutionContextException, ConnectionStateException {
- ConnectionHandshakingStateProcessor stateProcessor;
-
- // If handshake not allowed by this state this will throw a
ConnectionStateException
- stateProcessor =
messageExecutionContext.getConnectionStateProcessor().allowHandshake();
-
- final boolean handshakeSucceeded =
- validator.isValid(request.getMajorVersion(),
request.getMinorVersion());
- if (handshakeSucceeded) {
- ConnectionStateProcessor nextStateProcessor =
stateProcessor.handshakeSucceeded();
- messageExecutionContext.setConnectionStateProcessor(nextStateProcessor);
- } else {
- messageExecutionContext
- .setConnectionStateProcessor(new
ConnectionTerminatingStateProcessor());
- }
-
- return Success.of(ConnectionAPI.HandshakeResponse.newBuilder()
-
.setServerMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setServerMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)
- .setHandshakePassed(handshakeSucceeded).build());
- }
-}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
index 86eea8603d..9d6065b58c 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
@@ -14,15 +14,15 @@
*/
package org.apache.geode.internal.protocol.protobuf.v1.operations;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
public class VersionValidator {
private int majorVersion;
private int minorVersion;
public VersionValidator() {
- this(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+ this(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+ Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
}
VersionValidator(int majorVersion, int minorVersion) {
@@ -31,9 +31,9 @@ public VersionValidator() {
}
public boolean isValid(int majorVersion, int minorVersion) {
- if (majorVersion != ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE
+ if (majorVersion != Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE
&& majorVersion == this.majorVersion) {
- if (minorVersion !=
ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE
+ if (minorVersion != Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE
&& minorVersion <= this.minorVersion) {
return true;
}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 8109e6ff4b..065bd5d22a 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -27,7 +27,6 @@
import
org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler;
import
org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionRequestOperationHandler;
import
org.apache.geode.internal.protocol.protobuf.v1.operations.GetRequestOperationHandler;
-import
org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler;
import
org.apache.geode.internal.protocol.protobuf.v1.operations.PutAllRequestOperationHandler;
import
org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler;
import
org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler;
@@ -110,12 +109,5 @@ private void addContexts() {
opsResp ->
ClientProtocol.Response.newBuilder().setGetAvailableServersResponse(opsResp),
new ResourcePermission(ResourcePermission.Resource.CLUSTER,
ResourcePermission.Operation.READ)));
-
- operationContexts.put(RequestAPICase.HANDSHAKEREQUEST,
- new
ProtobufOperationContext<>(ClientProtocol.Request::getHandshakeRequest,
- new HandshakeRequestOperationHandler(),
- opsResp ->
ClientProtocol.Response.newBuilder().setHandshakeResponse(opsResp),
- new ResourcePermission(ResourcePermission.Resource.DATA,
- ResourcePermission.Operation.READ)));
}
}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
index 58fa33a566..ec39ec98ca 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
@@ -14,19 +14,23 @@
*/
package org.apache.geode.internal.protocol.protobuf.v1.state;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.protocol.MessageExecutionContext;
import org.apache.geode.internal.protocol.OperationContext;
import org.apache.geode.internal.protocol.ProtocolErrorCode;
-import
org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler;
-import
org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
+import
org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeHandler;
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
import
org.apache.geode.internal.protocol.state.LegacySecurityConnectionStateProcessor;
import
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
import
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
import org.apache.geode.internal.security.SecurityService;
-public class ProtobufConnectionHandshakeStateProcessor
- implements ConnectionHandshakingStateProcessor {
+public class ProtobufConnectionHandshakeStateProcessor implements
ConnectionStateProcessor {
private final SecurityService securityService;
public ProtobufConnectionHandshakeStateProcessor(SecurityService
securityService) {
@@ -36,14 +40,11 @@ public
ProtobufConnectionHandshakeStateProcessor(SecurityService securityService
@Override
public void validateOperation(MessageExecutionContext messageContext,
OperationContext operationContext) throws ConnectionStateException {
- if (!(operationContext.getOperationHandler() instanceof
HandshakeRequestOperationHandler)) {
- throw new ConnectionStateException(ProtocolErrorCode.HANDSHAKE_REQUIRED,
- "Protobuf handshake must be completed before any other operation.");
- }
+ throw new ConnectionStateException(ProtocolErrorCode.GENERIC_FAILURE,
+ "Connection processing should never be asked to validate an
operation");
}
- @Override
- public ConnectionStateProcessor handshakeSucceeded() {
+ private ConnectionStateProcessor nextConnectionState() {
if (securityService.isIntegratedSecurity()) {
return new ConnectionShiroAuthenticatingStateProcessor(securityService);
} else if (securityService.isPeerSecurityRequired()
@@ -54,4 +55,19 @@ public ConnectionStateProcessor handshakeSucceeded() {
return new NoSecurityConnectionStateProcessor();
}
}
+
+ @Override
+ public boolean handleMessageIndependently(InputStream inputStream,
OutputStream outputStream,
+ MessageExecutionContext executionContext) throws IOException {
+ // inputStream will have had the first byte stripped off to determine
communication mode, add
+ // that byte back before processing message
+ PushbackInputStream messageStream = new PushbackInputStream(inputStream);
+
messageStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+ if (HandshakeHandler.handleHandshake(messageStream, outputStream,
+ executionContext.getStatistics())) {
+ executionContext.setConnectionStateProcessor(nextConnectionState());
+ }
+ return true;
+ }
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
index c3b6c7364f..f3f68f401f 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
@@ -87,19 +87,10 @@ public void setupCacheServerAndSocket() throws Exception {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
protobufProtocolSerializer = new ProtobufProtocolSerializer();
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
protobufProtocolSerializer.deserialize(inputStream);
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+ MessageUtil.performAndVerifyHandshake(socket);
}
private static class SimpleSecurityManager implements SecurityManager {
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
index c78a75dcf7..cc69333b6a 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
@@ -107,23 +107,13 @@ public void setUp() throws IOException,
InvalidProtocolMessageException {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
protobufProtocolSerializer = new ProtobufProtocolSerializer();
when(mockSecurityManager.authorize(same(securityPrincipal),
any())).thenReturn(false);
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
- ClientProtocol.Message.parseDelimitedFrom(inputStream);;
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+ MessageUtil.performAndVerifyHandshake(socket);
ClientProtocol.Message authenticationRequest =
ClientProtocol.Message.newBuilder()
.setRequest(ClientProtocol.Request.newBuilder()
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
index de3038f88d..0d3fd24888 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
@@ -18,6 +18,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -40,8 +41,10 @@
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
import
org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -96,48 +99,18 @@ public void tearDown() {
@Test
public void testNormalHandshakeSucceeds() throws Exception {
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
protobufProtocolSerializer.deserialize(inputStream);
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+ MessageUtil.performAndVerifyHandshake(socket);
}
@Test
public void testInvalidMajorVersionBreaksConnection() throws Exception {
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE);
+ Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(2000)
+
.setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
+ .writeDelimitedTo(socket.getOutputStream());
- // Verify that connection is closed
- Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
- try {
- assertEquals(-1, socket.getInputStream().read()); // EOF implies
disconnected.
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- @Test
- public void testInvalidMinorVersionBreaksConnectionAfterResponse() throws
Exception {
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
protobufProtocolSerializer.deserialize(inputStream);
-
assertFalse(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+ Handshake.HandshakeAcknowledgement handshakeResponse =
+
Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream());
+ assertFalse(handshakeResponse.getHandshakePassed());
// Verify that connection is closed
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
@@ -150,29 +123,15 @@ public void
testInvalidMinorVersionBreaksConnectionAfterResponse() throws Except
});
}
+ /**
+ * Protobuf seems to omit values that are set to their default (0). This
ruins the serialization
+ * trick we use because the message size changes.
+ */
@Test
- public void testUnexpectedHandshakeFailsAndClosesConnection() throws
Exception {
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
protobufProtocolSerializer.deserialize(inputStream);
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message failingHandshake =
protobufProtocolSerializer.deserialize(inputStream);
- assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
-
failingHandshake.getResponse().getErrorResponse().getError().getErrorCode());
+ public void testMissingMajorVersionBreaksConnection() throws Exception {
+ Handshake.NewConnectionHandshake.newBuilder()
+
.setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE).setMinorVersion(0)
+ .build().writeDelimitedTo(socket.getOutputStream());
// Verify that connection is closed
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
@@ -180,7 +139,8 @@ public void
testUnexpectedHandshakeFailsAndClosesConnection() throws Exception {
assertEquals(-1, socket.getInputStream().read()); // EOF implies
disconnected.
return true;
} catch (IOException e) {
- throw new RuntimeException(e);
+ // Ignore IOExceptions (sometimes socket reset exception is thrown)
+ return true;
}
});
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
index 5a10b1f0af..c998f4a22b 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
@@ -14,6 +14,16 @@
*/
package org.apache.geode.internal.protocol.protobuf.v1;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+import com.google.protobuf.MessageLite;
+
+import org.apache.geode.internal.protocol.protobuf.Handshake;
import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
import org.apache.geode.internal.protocol.serialization.SerializationService;
@@ -22,6 +32,24 @@
public class MessageUtil {
+ public static void performAndVerifyHandshake(Socket socket) throws
IOException {
+ sendHandshake(socket);
+ verifyHandshakeSuccess(socket);
+ }
+
+ public static void verifyHandshakeSuccess(Socket socket) throws IOException {
+ Handshake.HandshakeAcknowledgement handshakeResponse =
+
Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream());
+ assertTrue(handshakeResponse.getHandshakePassed());
+ }
+
+ public static void sendHandshake(Socket socket) throws IOException {
+ Handshake.NewConnectionHandshake.newBuilder()
+ .setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+
.setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
+ .writeDelimitedTo(socket.getOutputStream());
+ }
+
public static RegionAPI.GetRegionRequest makeGetRegionRequest(String
requestRegion) {
return
RegionAPI.GetRegionRequest.newBuilder().setRegionName(requestRegion).build();
}
@@ -68,4 +96,14 @@
private static RegionAPI.GetRequest.Builder getGetRequestBuilder() {
return RegionAPI.GetRequest.newBuilder();
}
+
+ public static ByteArrayInputStream
writeMessageDelimitedToInputStream(MessageLite message) {
+ try {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ message.writeDelimitedTo(output);
+ return new ByteArrayInputStream(output.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e); // never happens.
+ }
+ }
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
index bf1127a8cd..7f6327d649 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
@@ -143,8 +143,8 @@ public void setup() throws Exception {
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+ MessageUtil.performAndVerifyHandshake(socket);
serializationService = new ProtobufSerializationService();
}
@@ -160,16 +160,6 @@ public void cleanup() throws IOException {
public void testBasicMessagesAndStats() throws Exception {
ProtobufProtocolSerializer protobufProtocolSerializer = new
ProtobufProtocolSerializer();
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
- ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
-
ClientProtocol.Message putMessage =
MessageUtil.makePutRequestMessage(serializationService, TEST_KEY,
TEST_VALUE, TEST_REGION);
protobufProtocolSerializer.serialize(putMessage, outputStream);
@@ -204,7 +194,7 @@ public void testConnectionCountIsProperlyDecremented()
throws Exception {
CacheServer cacheServer = cacheServers.stream().findFirst().get();
AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> acceptor.getClientServerCnxCount() == 1);
// make a request to the server
@@ -216,7 +206,7 @@ public void testConnectionCountIsProperlyDecremented()
throws Exception {
// make sure socket is still open
assertFalse(socket.isClosed());
socket.close();
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> acceptor.getClientServerCnxCount() == 0);
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
index b14ceb27a0..4adf37e29a 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
@@ -106,8 +106,7 @@ public void setup() throws Exception {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+ MessageUtil.performAndVerifyHandshake(socket);
serializationService = new ProtobufSerializationService();
@@ -155,14 +154,6 @@ public void testUnresponsiveClientsGetDisconnected()
throws Exception {
public void testResponsiveClientsStaysConnected() throws Exception {
ProtobufProtocolSerializer protobufProtocolSerializer = new
ProtobufProtocolSerializer();
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- protobufProtocolSerializer.deserialize(socket.getInputStream());
-
ClientProtocol.Message putMessage =
MessageUtil.makePutRequestMessage(serializationService, TEST_KEY,
TEST_VALUE, TEST_REGION);
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
index 43287e00a2..f9489bf28f 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.internal.protocol.protobuf.v1.acceptance;
+import static
org.apache.geode.internal.protocol.protobuf.v1.MessageUtil.performAndVerifyHandshake;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -53,7 +54,6 @@
import org.apache.geode.internal.net.SocketCreatorFactory;
import
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
import
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
import
org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
@@ -102,9 +102,6 @@ public void setup() throws Exception {
socket = new Socket("localhost", cacheServerPort);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
- OutputStream outputStream = socket.getOutputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -189,18 +186,8 @@ private void validateSocketCreationAndDestruction(int
cacheServerPort, int conne
Awaitility.await().atMost(5,
TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
-
.setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
-
ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
+ performAndVerifyHandshake(socket);
ClientProtocol.Message putMessage = MessageUtil
.makePutRequestMessage(serializationService, TEST_KEY,
TEST_VALUE, TEST_REGION);
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
index b114a21ba4..a828fa5376 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
@@ -138,18 +138,8 @@ public void setup() throws Exception {
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
-
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ClientProtocol.Message.newBuilder()
- .setRequest(ClientProtocol.Request.newBuilder()
- .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
- .build().writeDelimitedTo(outputStream);
- ClientProtocol.Message handshakeResponse =
- ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
+ MessageUtil.performAndVerifyHandshake(socket);
serializationService = new ProtobufSerializationService();
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
index ac432af9ca..fc52d33068 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
@@ -41,6 +41,7 @@
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
import
org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
@@ -72,11 +73,7 @@ private Socket createSocket() throws IOException {
Host host = Host.getHost(0);
int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
Socket socket = new Socket(host.getHostName(), locatorPort);
- DataOutputStream dataOutputStream = new
DataOutputStream(socket.getOutputStream());
- dataOutputStream.writeInt(0);
- // Using the constant from AcceptorImpl to ensure that magic byte is the
same
- dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
-
dataOutputStream.writeByte(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+ MessageUtil.sendHandshake(socket);
return socket;
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java
new file mode 100644
index 0000000000..098b1a2720
--- /dev/null
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java
@@ -0,0 +1,140 @@
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.shiro.subject.Subject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.Result;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
+import
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import
org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor;
+import
org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
+import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.serialization.SerializationService;
+import
org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
+import
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
+import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+@Category(UnitTest.class)
+public class HandshakeHandlerJUnitTest {
+ private static final int INVALID_MAJOR_VERSION = 67;
+ private static final int INVALID_MINOR_VERSION = 92347;
+
+ private HandshakeHandler handshakeHandler = new HandshakeHandler();
+
+ @Test
+ public void testCurrentVersionHandshakeSucceeds() throws Exception {
+ Handshake.NewConnectionHandshake handshakeRequest =
+
generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+ Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+ ByteArrayInputStream inputStream =
+ MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ assertTrue(handshakeHandler.handleHandshake(inputStream, outputStream,
+ mock(ProtocolClientStatistics.class)));
+
+ Handshake.HandshakeAcknowledgement handshakeResponse =
Handshake.HandshakeAcknowledgement
+ .parseDelimitedFrom(new
ByteArrayInputStream(outputStream.toByteArray()));
+ assertTrue(handshakeResponse.getHandshakePassed());
+ assertEquals(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+ handshakeResponse.getServerMajorVersion());
+ assertEquals(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE,
+ handshakeResponse.getServerMinorVersion());
+ }
+
+ @Test
+ public void testInvalidMajorVersionFails() throws Exception {
+ assertNotEquals(INVALID_MAJOR_VERSION,
Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+ Handshake.NewConnectionHandshake handshakeRequest =
generateHandshakeRequest(
+ INVALID_MAJOR_VERSION,
Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+ verifyHandshakeFails(handshakeRequest);
+
+ // Also validate the protobuf INVALID_MAJOR_VERSION_VALUE constant fails
+ handshakeRequest =
generateHandshakeRequest(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
+ Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+ verifyHandshakeFails(handshakeRequest);
+ }
+
+ private void verifyHandshakeFails(Handshake.NewConnectionHandshake
handshakeRequest)
+ throws Exception {
+ ByteArrayInputStream inputStream =
+ MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ try {
+ handshakeHandler.handleHandshake(inputStream, outputStream,
+ mock(ProtocolClientStatistics.class));
+ fail("Invalid handshake should throw IOException");
+ } catch (IOException e) {
+ // expected if handshake verification fails
+ }
+
+ Handshake.HandshakeAcknowledgement handshakeResponse =
Handshake.HandshakeAcknowledgement
+ .parseDelimitedFrom(new
ByteArrayInputStream(outputStream.toByteArray()));
+
+ assertFalse(handshakeResponse.getHandshakePassed());
+ }
+
+ @Test
+ public void testInvalidMinorVersionFails() throws Exception {
+ assertNotEquals(INVALID_MINOR_VERSION,
Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+ Handshake.NewConnectionHandshake handshakeRequest =
generateHandshakeRequest(
+ Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
INVALID_MINOR_VERSION);
+
+ verifyHandshakeFails(handshakeRequest);
+
+ // Also validate the protobuf INVALID_MINOR_VERSION_VALUE constant fails
+ handshakeRequest =
generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+ Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE);
+ verifyHandshakeFails(handshakeRequest);
+ }
+
+ private Handshake.NewConnectionHandshake generateHandshakeRequest(int
majorVersion,
+ int minorVersion) {
+ return
Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(majorVersion)
+ .setMinorVersion(minorVersion).build();
+ }
+}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java
deleted file mode 100644
index 0baf9bb8ea..0000000000
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package org.apache.geode.internal.protocol.protobuf.v1.operations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import org.apache.shiro.subject.Subject;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.Failure;
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.Result;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
-import
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
-import
org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor;
-import
org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
-import
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities;
-import org.apache.geode.internal.protocol.serialization.SerializationService;
-import
org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
-import
org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
-import
org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-@Category(UnitTest.class)
-public class HandshakeRequestOperationHandlerJUnitTest {
- private static final int INVALID_MAJOR_VERSION = 67;
- private static final int INVALID_MINOR_VERSION = 92347;
-
- private HandshakeRequestOperationHandler handshakeHandler =
- new HandshakeRequestOperationHandler();
- private SerializationService serializationService = new
ProtobufSerializationService();
- private ProtobufConnectionHandshakeStateProcessor handshakeStateProcessor;
-
- @Before
- public void Setup() {
- handshakeStateProcessor =
- new
ProtobufConnectionHandshakeStateProcessor(mock(SecurityService.class));
- }
-
- @Test
- public void testCurrentVersionHandshakeSucceeds() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
handshakeStateProcessor);
- Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse>
result =
- handshakeHandler.process(serializationService, handshakeRequest,
messageExecutionContext);
- ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
- assertTrue(handshakeResponse.getHandshakePassed());
- assertEquals(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- handshakeResponse.getServerMajorVersion());
- assertEquals(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE,
- handshakeResponse.getServerMinorVersion());
- }
-
- @Test
- public void testInvalidMajorVersionFails() throws Exception {
- assertNotEquals(INVALID_MAJOR_VERSION,
ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
- ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
- INVALID_MAJOR_VERSION,
ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
handshakeStateProcessor);
-
- verifyHandshakeFails(handshakeRequest, messageExecutionContext);
- }
-
- @Test
- public void testInvalidMajorVersionProtocolConstantFails() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
handshakeStateProcessor);
- verifyHandshakeFails(handshakeRequest, messageExecutionContext);
- }
-
- private void verifyHandshakeFails(ConnectionAPI.HandshakeRequest
handshakeRequest,
- MessageExecutionContext messageExecutionContext) throws Exception {
- Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse>
result =
- handshakeHandler.process(serializationService, handshakeRequest,
messageExecutionContext);
- ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
- assertFalse(handshakeResponse.getHandshakePassed());
- }
-
- @Test
- public void testInvalidMinorVersionFails() throws Exception {
- assertNotEquals(INVALID_MINOR_VERSION,
ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-
- ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
- ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
INVALID_MINOR_VERSION);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
handshakeStateProcessor);
-
- verifyHandshakeFails(handshakeRequest, messageExecutionContext);
- }
-
- @Test
- public void testInvalidMinorVersionProtocolConstantFails() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
handshakeStateProcessor);
-
- verifyHandshakeFails(handshakeRequest, messageExecutionContext);
- }
-
- @Test
- public void testNoSecurityStateFailsHandshake() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext = new
MessageExecutionContext(
- mock(InternalCache.class), null, new
NoSecurityConnectionStateProcessor());
-
- try {
- handshakeHandler.process(serializationService, handshakeRequest,
messageExecutionContext);
- fail("Handshake in non-handshake state should throw exception");
- } catch (ConnectionStateException ex) {
- assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
- }
- }
-
- @Test
- public void testAuthenticatingStateFailsHandshake() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
- new
ConnectionShiroAuthenticatingStateProcessor(mock(SecurityService.class)));
-
- try {
- handshakeHandler.process(serializationService, handshakeRequest,
messageExecutionContext);
- fail("Handshake in non-handshake state should throw exception");
- } catch (ConnectionStateException ex) {
- assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
- }
- }
-
- @Test
- public void testAuthorizingStateFailsHandshake() throws Exception {
- ConnectionAPI.HandshakeRequest handshakeRequest =
-
generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
- MessageExecutionContext messageExecutionContext =
- new MessageExecutionContext(mock(InternalCache.class), null,
- new
ConnectionShiroAuthorizingStateProcessor(mock(SecurityService.class),
- mock(Subject.class)));
-
- try {
- handshakeHandler.process(serializationService, handshakeRequest,
messageExecutionContext);
- fail("Handshake in non-handshake state should throw exception");
- } catch (ConnectionStateException ex) {
- assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
- }
- }
-
- private ConnectionAPI.HandshakeRequest generateHandshakeRequest(int
majorVersion,
- int minorVersion) {
- return
ConnectionAPI.HandshakeRequest.newBuilder().setMajorVersion(majorVersion)
- .setMinorVersion(minorVersion).build();
- }
-}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
index b59d154629..e83e6e0277 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
@@ -20,7 +20,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
import org.apache.geode.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -33,11 +33,11 @@
@Test
public void testInvalidVersions() throws Exception {
assertFalse(
- validator.isValid(MAJOR_VERSION,
ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE));
+ validator.isValid(MAJOR_VERSION,
Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE));
assertFalse(
-
validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
MINOR_VERSION));
-
assertFalse(validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
- ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE));
+ validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
MINOR_VERSION));
+
assertFalse(validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
+ Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE));
}
@Test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Stop using magic bytes in protobuf connection establishment
> -----------------------------------------------------------
>
> Key: GEODE-4059
> URL: https://issues.apache.org/jira/browse/GEODE-4059
> Project: Geode
> Issue Type: New Feature
> Components: client/server
> Reporter: Brian Rowe
>
> When connection to the server, the client currently uses some magic bytes to
> identify the protocol being used. This is non-intuitive and problematic for
> client developers, and should not be necessary if we make clever use of a
> magic protobuf connection establishing message. This should also serve as
> our handshake.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)