This is an automated email from the ASF dual-hosted git repository. gosullivan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit b974668bd7479c3dabea24ec3daa8c35bcadae8a Author: Galen O'Sullivan <gosulli...@pivotal.io> AuthorDate: Wed Sep 20 13:35:09 2017 -0700 GEODE-3546: Finish new protocol Locator stats. - Increment and decrement connection count stats. - Add stat checking to a locator test. - Add stats to a second test method. - Prevent locator state from one test from influencing another test. Signed-off-by: Galen O'Sullivan <gosulli...@pivotal.io> --- .../distributed/internal/InternalLocator.java | 4 +- .../distributed/internal/tcpserver/TcpServer.java | 14 +- .../acceptance/LocatorConnectionDUnitTest.java | 239 +++++++++++++++++---- 3 files changed, 210 insertions(+), 47 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index c4541c3..489e647 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -1335,10 +1335,8 @@ public class InternalLocator extends Locator implements ConnectListener { try { this.stats.hookupStats(sys, SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress()); - ClientProtocolMessageHandler messageHandler = this.server.getMessageHandler(); + ClientProtocolMessageHandler messageHandler = this.server.getClientProtocolMessageHandler(); if (messageHandler != null) { - // GEODE-3546 - this should create locator-specific stats but is creating client/server - // stats messageHandler.initializeStatistics("LocatorStats", sys); } } catch (UnknownHostException e) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index cf8e477..85b2ace 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -133,7 +133,7 @@ public class TcpServer { private final PoolStatHelper poolHelper; private final InternalLocator internalLocator; private final TcpHandler handler; - private ClientProtocolMessageHandler messageHandler; + private ClientProtocolMessageHandler clientProtocolMessageHandler; private PooledExecutorWithDMStats executor; @@ -158,20 +158,20 @@ public class TcpServer { /** * returns the message handler used for client/locator communications processing */ - public ClientProtocolMessageHandler getMessageHandler() { - return messageHandler; + public ClientProtocolMessageHandler getClientProtocolMessageHandler() { + return clientProtocolMessageHandler; } public TcpServer(int port, InetAddress bind_address, Properties sslConfig, DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, ThreadGroup threadGroup, String threadName, InternalLocator internalLocator, - ClientProtocolMessageHandler messageHandler) { + ClientProtocolMessageHandler clientProtocolMessageHandler) { this.port = port; this.bind_address = bind_address; this.handler = handler; this.poolHelper = poolHelper; this.internalLocator = internalLocator; - this.messageHandler = messageHandler; + this.clientProtocolMessageHandler = clientProtocolMessageHandler; // register DSFID types first; invoked explicitly so that all message type // initializations do not happen in first deserialization on a possibly // "precious" thread @@ -381,8 +381,10 @@ public class TcpServer { if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) { if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL && Boolean.getBoolean("geode.feature-protobuf-protocol")) { - messageHandler.receiveMessage(input, socket.getOutputStream(), + clientProtocolMessageHandler.getStatistics().clientConnected(); + clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(), new MessageExecutionContext(internalLocator)); + clientProtocolMessageHandler.getStatistics().clientDisconnected(); } else { rejectUnknownProtocolConnection(socket, gossipVersion); } diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java index 4e310d2..fe03740 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java @@ -17,21 +17,25 @@ package org.apache.geode.protocol.acceptance; import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.Socket; import java.util.Properties; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.experimental.categories.Category; +import org.apache.geode.Statistics; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.protocol.protobuf.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.ServerAPI; @@ -42,7 +46,10 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.junit.categories.DistributedTest; /* @@ -51,29 +58,31 @@ import org.apache.geode.test.junit.categories.DistributedTest; @Category(DistributedTest.class) public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase { - private Socket socket; - @Rule - public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + public final DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); @Before public void setup() throws IOException { - Host host = Host.getHost(0); - int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); startCacheWithCacheServer(); Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true")); + } - socket = new Socket(host.getHostName(), locatorPort); + private Socket createSocket() throws IOException { + Host host = Host.getHost(0); + int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); + Socket socket = new Socket(host.getHostName(), locatorPort); DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream()); dataOutputStream.writeInt(0); // Using the constant from AcceptorImpl to ensure that magic byte is the same dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber()); + return socket; } + // Test getAvailableServers twice, validating stats before any messages, after 1, and after 2. @Test - public void testEchoProtobufMessageFromLocator() - throws IOException, InvalidProtocolMessageException { + public void testGetAvailableServersWithStats() throws Throwable { ClientProtocol.Request.Builder protobufRequestBuilder = ProtobufUtilities.createProtobufRequestBuilder(); ClientProtocol.Message getAvailableServersRequestMessage = @@ -81,48 +90,202 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase { protobufRequestBuilder.setGetAvailableServersRequest( ProtobufRequestUtilities.createGetAvailableServersRequest()).build()); - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, - socket.getOutputStream()); + try { + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message getAvailableServersResponseMessage = - protobufProtocolSerializer.deserialize(socket.getInputStream()); - assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); - assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, - getAvailableServersResponseMessage.getMessageTypeCase()); - ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); - assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE, - messageResponse.getResponseAPICase()); - ServerAPI.GetAvailableServersResponse getAvailableServersResponse = - messageResponse.getGetAvailableServersResponse(); - assertEquals(1, getAvailableServersResponse.getServersCount()); + try (Socket socket = createSocket()) { + long messagesReceived = getMessagesReceived(); + long messagesSent = getMessagesSent(); + int clientConnectionStarts = getClientConnectionStarts(); + int clientConnectionTerminations = getClientConnectionTerminations(); + + protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, + socket.getOutputStream()); + + validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream()); + + Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = distributedSystem + .findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + assertEquals(0, statistics.get("currentClientConnections")); + assertEquals(messagesReceived + 1, statistics.get("messagesReceived")); + assertEquals(messagesSent + 1, statistics.get("messagesSent")); + assertTrue(statistics.get("bytesReceived").longValue() > 0); + assertTrue(statistics.get("bytesSent").longValue() > 0); + assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts")); + assertEquals(clientConnectionTerminations + 1, + statistics.get("clientConnectionTerminations")); + assertEquals(0L, statistics.get("authorizationViolations")); + assertEquals(0L, statistics.get("authenticationFailures")); + }); + } + + try (Socket socket = createSocket()) { + long messagesReceived = getMessagesReceived(); + long messagesSent = getMessagesSent(); + int clientConnectionStarts = getClientConnectionStarts(); + int clientConnectionTerminations = getClientConnectionTerminations(); + + protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, + socket.getOutputStream()); + + validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream()); + + Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = distributedSystem + .findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + assertEquals(0, statistics.get("currentClientConnections")); + assertEquals(messagesReceived + 1, statistics.get("messagesReceived")); + assertEquals(messagesSent + 1, statistics.get("messagesSent")); + assertTrue(statistics.get("bytesReceived").longValue() > 0); + assertTrue(statistics.get("bytesSent").longValue() > 0); + assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts")); + assertEquals(clientConnectionTerminations + 1, + statistics.get("clientConnectionTerminations")); + assertEquals(0L, statistics.get("authorizationViolations")); + assertEquals(0L, statistics.get("authenticationFailures")); + }); + } + } catch (RMIException e) { + throw e.getCause(); // so that assertions propagate properly. + } } @Test public void testInvalidOperationReturnsFailure() throws IOException, InvalidProtocolMessageException { - ClientProtocol.Request.Builder protobufRequestBuilder = - ProtobufUtilities.createProtobufRequestBuilder(); - ClientProtocol.Message getAvailableServersRequestMessage = - ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445), - protobufRequestBuilder - .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest()) - .build()); + IgnoredException ignoredInvalidExecutionContext = + IgnoredException.addIgnoredException("Invalid execution context"); + try (Socket socket = createSocket()) { + + ClientProtocol.Request.Builder protobufRequestBuilder = + ProtobufUtilities.createProtobufRequestBuilder(); + ClientProtocol.Message getRegionNamesRequestMessage = + ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445), + protobufRequestBuilder + .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest()) + .build()); + + long messagesReceived = getMessagesReceived(); + long messagesSent = getMessagesSent(); + int clientConnectionStarts = getClientConnectionStarts(); + int clientConnectionTerminations = getClientConnectionTerminations(); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + protobufProtocolSerializer.serialize(getRegionNamesRequestMessage, socket.getOutputStream()); + + ClientProtocol.Message getAvailableServersResponseMessage = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(1233445, + getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, + getAvailableServersResponseMessage.getMessageTypeCase()); + ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE, + messageResponse.getResponseAPICase()); + assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue, + messageResponse.getErrorResponse().getError().getErrorCode()); + + Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = distributedSystem + .findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + assertEquals(0, statistics.get("currentClientConnections")); + assertEquals(messagesReceived + 1, statistics.get("messagesReceived")); + assertEquals(messagesSent + 1, statistics.get("messagesSent")); + assertTrue(statistics.get("bytesReceived").longValue() > 0); + assertTrue(statistics.get("bytesSent").longValue() > 0); + assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts")); + assertEquals(clientConnectionTerminations + 1, + statistics.get("clientConnectionTerminations")); + assertEquals(0L, statistics.get("authorizationViolations")); + assertEquals(0L, statistics.get("authenticationFailures")); + }); + } + ignoredInvalidExecutionContext.remove(); + } + + private Long getMessagesReceived() { + return Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = + distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + return statistics.get("messagesReceived").longValue(); + }); + } + + private Long getMessagesSent() { + return Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = + distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + return statistics.get("messagesSent").longValue(); + }); + } + + private Integer getClientConnectionStarts() { + return Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + + Statistics[] protobufServerStats = + distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + return statistics.get("clientConnectionStarts").intValue(); + }); + } - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, - socket.getOutputStream()); + private Integer getClientConnectionTerminations() { + return Host.getLocator().invoke(() -> { + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) Locator.getLocator().getDistributedSystem(); + Statistics[] protobufServerStats = + distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats")); + assertEquals(1, protobufServerStats.length); + Statistics statistics = protobufServerStats[0]; + return statistics.get("clientConnectionTerminations").intValue(); + }); + } + + private void validateGetAvailableServersResponse( + ProtobufProtocolSerializer protobufProtocolSerializer, InputStream inputStream) + throws InvalidProtocolMessageException, IOException { ClientProtocol.Message getAvailableServersResponseMessage = - protobufProtocolSerializer.deserialize(socket.getInputStream()); + protobufProtocolSerializer.deserialize(inputStream); + assertNotNull(getAvailableServersResponseMessage); assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, getAvailableServersResponseMessage.getMessageTypeCase()); ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); - assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE, + assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE, messageResponse.getResponseAPICase()); - assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue, - messageResponse.getErrorResponse().getError().getErrorCode()); + ServerAPI.GetAvailableServersResponse getAvailableServersResponse = + messageResponse.getGetAvailableServersResponse(); + assertEquals(1, getAvailableServersResponse.getServersCount()); } @Override -- To stop receiving notification emails like this one, please contact "commits@geode.apache.org" <commits@geode.apache.org>.