This is an automated email from the ASF dual-hosted git repository. pivotalsarge pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new b9d9b38 GEODE-4401: Add disconnect client message to server and driver. (#1525) b9d9b38 is described below commit b9d9b38e42686a1973cee51f89053eb8b2519ec4 Author: Michael "Sarge" Dodge <mdo...@pivotal.io> AuthorDate: Fri Mar 2 11:43:12 2018 -0800 GEODE-4401: Add disconnect client message to server and driver. (#1525) * GEODE-4401: Add disconnect client message to server and driver. * GEODE-4401: Address review comments. * GEODE-4401: Update the driver to use the channel instead of a socket. --- .../geode/experimental/driver/ProtobufDriver.java | 29 +++-- .../experimental/driver/DriverConnectionTest.java | 6 - .../src/main/proto/v1/clientProtocol.proto | 3 + .../src/main/proto/v1/connection_API.proto | 8 ++ .../DisconnectClientRequestOperationHandler.java | 43 +++++++ .../registry/ProtobufOperationContextRegistry.java | 8 +- .../v1/DisconnectClientIntegrationTest.java | 137 +++++++++++++++++++++ 7 files changed, 218 insertions(+), 16 deletions(-) diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java index 40e435c..9b233f7 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java @@ -15,20 +15,16 @@ package org.apache.geode.experimental.driver; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.Socket; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.geode.annotations.Experimental; -import org.apache.geode.internal.protocol.protobuf.ProtocolVersion; -import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; -import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI; +import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI; import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRequest; @@ -84,9 +80,24 @@ public class ProtobufDriver implements Driver { @Override public void close() { try { - this.channel.close(); - } catch (IOException e) { - // ignore + final Message disconnectClientRequest = ClientProtocol.Message.newBuilder() + .setDisconnectClientRequest( + ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Driver closed")) + .build(); + final ConnectionAPI.DisconnectClientResponse disconnectClientResponse = + channel.sendRequest(disconnectClientRequest, MessageTypeCase.DISCONNECTCLIENTRESPONSE) + .getDisconnectClientResponse(); + if (Objects.isNull(disconnectClientResponse)) { + // The server did not acknowledge the disconnect request; ignore for now. + } + } catch (IOException ioe) { + // NOP + } finally { + try { + this.channel.close(); + } catch (IOException e) { + // ignore + } } } diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java index c840d9e..6e03607 100644 --- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java +++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/DriverConnectionTest.java @@ -15,9 +15,7 @@ package org.apache.geode.experimental.driver; import static org.apache.geode.internal.Assert.assertTrue; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import java.io.IOException; import java.util.Properties; @@ -31,11 +29,9 @@ import org.junit.experimental.categories.Category; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.distributed.Locator; -import org.apache.geode.pdx.PdxInstance; import org.apache.geode.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) @@ -99,6 +95,4 @@ public class DriverConnectionTest { driver.close(); assertFalse(driver.isConnected()); } - - } diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto index 184d6c4..22ab5c4 100644 --- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto +++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto @@ -72,6 +72,9 @@ message Message { KeySetRequest keySetRequest = 28; KeySetResponse keySetResponse = 29; + + DisconnectClientRequest disconnectClientRequest = 30; + DisconnectClientResponse disconnectClientResponse = 31; } } diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto index 7c4435e..66e2cdd 100644 --- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto +++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto @@ -23,3 +23,11 @@ message AuthenticationRequest { message AuthenticationResponse { bool authenticated = 1; } + +message DisconnectClientRequest { + string reason = 1; +} + +message DisconnectClientResponse { + // message presence indicates success. +} \ No newline at end of file diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java new file mode 100644 index 0000000..fa0f8f5 --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1.operations; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler; +import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext; +import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; +import org.apache.geode.internal.protocol.protobuf.v1.Result; +import org.apache.geode.internal.protocol.protobuf.v1.Success; +import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionTerminatingStateProcessor; + +public class DisconnectClientRequestOperationHandler implements + ProtobufOperationHandler<ConnectionAPI.DisconnectClientRequest, ConnectionAPI.DisconnectClientResponse> { + private static final Logger logger = LogService.getLogger(); + + @Override + public Result<ConnectionAPI.DisconnectClientResponse> process( + ProtobufSerializationService serializationService, + ConnectionAPI.DisconnectClientRequest request, + MessageExecutionContext messageExecutionContext) { + logger.info("Client disconnecting due to {}", request.getReason()); + messageExecutionContext + .setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor()); + + return Success.of(ConnectionAPI.DisconnectClientResponse.newBuilder().build()); + } +} diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java index 3bca6fe..13861d1 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java @@ -23,7 +23,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; -import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.operations.DisconnectClientRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler; @@ -72,6 +72,12 @@ public class ProtobufOperationContextRegistry { opsResp -> ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp), this::skipAuthorizationCheck)); + operationContexts.put(ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTREQUEST, + new ProtobufOperationContext<>(ClientProtocol.Message::getDisconnectClientRequest, + new DisconnectClientRequestOperationHandler(), + opsResp -> ClientProtocol.Message.newBuilder().setDisconnectClientResponse(opsResp), + this::skipAuthorizationCheck)); + operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREQUEST, new ProtobufOperationContext<>(ClientProtocol.Message::getGetRequest, new GetRequestOperationHandler(), diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java new file mode 100644 index 0000000..5b47c8f --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.Socket; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.management.internal.security.ResourceConstants; +import org.apache.geode.security.SecurityManager; +import org.apache.geode.test.junit.categories.IntegrationTest; + +@Category(IntegrationTest.class) +public class DisconnectClientIntegrationTest { + public static final String SECURITY_PRINCIPAL = "principle"; + private Socket socket; + private Cache cache; + private SecurityManager securityManager; + + @Rule + public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Before + public void setUp() throws Exception { + CacheFactory cacheFactory = new CacheFactory(new Properties()); + cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); + cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); + + securityManager = mock(SecurityManager.class); + cacheFactory.setSecurityManager(securityManager); + when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL); + when(securityManager.authorize(eq(SECURITY_PRINCIPAL), any())).thenReturn(true); + + cache = cacheFactory.create(); + + CacheServer cacheServer = cache.addCacheServer(); + int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); + + RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); + regionFactory.setDataPolicy(DataPolicy.PARTITION); + + + System.setProperty("geode.feature-protobuf-protocol", "true"); + + socket = new Socket("localhost", cacheServerPort); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + + MessageUtil.performAndVerifyHandshake(socket); + } + + @After + public void tearDown() { + cache.close(); + try { + socket.close(); + } catch (IOException ignore) { + // NOP + } + } + + @Test + public void disconnectsFromServer() throws Exception { + authenticateWithServer(); + + final ClientProtocol.Message requestMessage = createRequestMessageBuilder( + ConnectionAPI.DisconnectClientRequest.newBuilder().setReason("Normal termination")).build(); + + final ClientProtocol.Message responseMessage = writeMessage(requestMessage); + assertEquals(responseMessage.toString(), + ClientProtocol.Message.MessageTypeCase.DISCONNECTCLIENTRESPONSE, + responseMessage.getMessageTypeCase()); + final ConnectionAPI.DisconnectClientResponse disconnectClientResponse = + responseMessage.getDisconnectClientResponse(); + assertNotNull(disconnectClientResponse); + } + + private void authenticateWithServer() throws IOException { + ClientProtocol.Message.Builder request = ClientProtocol.Message.newBuilder() + .setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder() + .putCredentials(ResourceConstants.USER_NAME, "someuser") + .putCredentials(ResourceConstants.PASSWORD, "somepassword")); + + ClientProtocol.Message response = writeMessage(request.build()); + assertTrue(response.getAuthenticationResponse().getAuthenticated()); + } + + private ClientProtocol.Message.Builder createRequestMessageBuilder( + ConnectionAPI.DisconnectClientRequest.Builder disconnectClientRequest) { + return ClientProtocol.Message.newBuilder().setDisconnectClientRequest(disconnectClientRequest); + } + + private ClientProtocol.Message writeMessage(ClientProtocol.Message request) throws IOException { + request.writeDelimitedTo(socket.getOutputStream()); + + return ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream()); + } +} -- To stop receiving notification emails like this one, please contact pivotalsa...@apache.org.