This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch micrometer in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7863c796473b27da8a7d84c82649f2e2426d10a2 Author: Udo Kohlmeyer <[email protected]> AuthorDate: Tue Jan 2 10:55:52 2018 -0800 initial commit for micrometer implementation --- geode-protobuf/build.gradle | 28 +++ .../statistics/MicrometerClientStatsImpl.kt | 69 ++++++ .../protobuf/v1/ProtobufProtocolService.java | 7 +- .../v1/acceptance/CacheOperationsJUnitTest.java | 258 +++++++++++---------- 4 files changed, 235 insertions(+), 127 deletions(-) diff --git a/geode-protobuf/build.gradle b/geode-protobuf/build.gradle index c1eed05..13368a9 100644 --- a/geode-protobuf/build.gradle +++ b/geode-protobuf/build.gradle @@ -27,4 +27,32 @@ dependencies { testCompile 'org.powermock:powermock-api-mockito:' + project.'powermock.version' compile 'com.google.protobuf:protobuf-java:' + project.'protobuf-java.version' + compile group: 'io.micrometer', name: 'micrometer-core', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-atlas', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-influx', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-graphite', version: '1.0.0-rc.5' + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" +} +buildscript { + ext.kotlin_version = '1.2.10' + repositories { + mavenCentral() + } + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + } +} +apply plugin: 'kotlin' +repositories { + mavenCentral() +} +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} +compileTestKotlin { + kotlinOptions { + jvmTarget = "1.8" + } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt new file mode 100644 index 0000000..bb95d6a --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt @@ -0,0 +1,69 @@ +package org.apache.geode.internal.protocol.protobuf.statistics + +import com.netflix.spectator.atlas.AtlasConfig +import io.micrometer.atlas.AtlasMeterRegistry +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.composite.CompositeMeterRegistry +import io.micrometer.core.instrument.util.HierarchicalNameMapper +import io.micrometer.graphite.GraphiteConfig +import io.micrometer.graphite.GraphiteMeterRegistry +import io.micrometer.influx.InfluxConfig +import io.micrometer.influx.InfluxMeterRegistry +import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics +import java.time.Duration + +class MicrometerClientStatsImpl : ProtocolClientStatistics { + private val influxMetrics: MeterRegistry = InfluxMeterRegistry(object : InfluxConfig { + override fun step(): Duration = Duration.ofSeconds(10) + override fun db(): String = "mydb" + override fun get(k: String): String? = null + override fun uri(): String = "http://localhost:8086" + }, Clock.SYSTEM) + + private val atlasMetrics: MeterRegistry = AtlasMeterRegistry(object : AtlasConfig { + override fun get(k: String?): String? = null + override fun enabled(): Boolean = true + override fun uri(): String = "http://localhost:7101/api/v1/publish" + override fun step(): Duration = Duration.ofSeconds(10) + }, Clock.SYSTEM) + + private val metrics = CompositeMeterRegistry(Clock.SYSTEM) + + init { + metrics.add(influxMetrics) + metrics.add(atlasMetrics) + } + + val clientConnectedCounter = metrics.counter("clientConnected") + val messageReceivedCounter = metrics.counter("messageReceived") + val messageSentCounter = metrics.counter("messageSent") + val authorizationViolationsCounter = metrics.counter("authorizationViolations") + val authenticationFailureCounter = metrics.counter("authenticationFailures") + + override fun clientConnected() { + System.err.println("Increment Counter") + clientConnectedCounter.increment() + } + + override fun clientDisconnected() { + System.err.println("Decrement Counter") + clientConnectedCounter.increment(-1.0) + } + + override fun messageReceived(bytes: Int) { + messageReceivedCounter.increment(bytes.toDouble()) + } + + override fun messageSent(bytes: Int) { + messageSentCounter.increment(bytes.toDouble()) + } + + override fun incAuthorizationViolations() { + authorizationViolationsCounter.increment() + } + + override fun incAuthenticationFailures() { + authenticationFailureCounter.increment() + } +} \ No newline at end of file diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java index bfc2048..58637a6 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java @@ -24,6 +24,10 @@ import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics; import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics; import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics; import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor; +import org.apache.geode.internal.protocol.protobuf.Handshake; +import org.apache.geode.internal.protocol.protobuf.statistics.MicrometerClientStatsImpl; +import org.apache.geode.internal.protocol.statistics.NoOpStatistics; +import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics; import org.apache.geode.internal.security.SecurityService; public class ProtobufProtocolService implements ClientProtocolService { @@ -33,7 +37,8 @@ public class ProtobufProtocolService implements ClientProtocolService { @Override public synchronized void initializeStatistics(String statisticsName, StatisticsFactory factory) { if (statistics == null) { - statistics = new ProtobufClientStatistics(factory, statisticsName); + // statistics = new ProtobufClientStatisticsImpl(factory, statisticsName); + statistics = new MicrometerClientStatsImpl(); } } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java index 4859837..cac6ba3 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.awaitility.Awaitility; import org.junit.After; @@ -80,16 +81,16 @@ public class CacheOperationsJUnitTest { private final String TEST_VALUE = "testValue"; private final String TEST_REGION = "testRegion"; - private final String DEFAULT_STORE = "default.keystore"; - private final String SSL_PROTOCOLS = "any"; - private final String SSL_CIPHERS = "any"; + private final String DEFAULT_STORE = "default.keystore"; + private final String SSL_PROTOCOLS = "any"; + private final String SSL_CIPHERS = "any"; - private final String TEST_MULTIOP_KEY1 = "multiopKey1"; - private final String TEST_MULTIOP_KEY2 = "multiopKey2"; - private final String TEST_MULTIOP_KEY3 = "multiopKey3"; - private final String TEST_MULTIOP_VALUE1 = "multiopValue1"; - private final String TEST_MULTIOP_VALUE2 = "multiopValue2"; - private final String TEST_MULTIOP_VALUE3 = "multiopValue3"; + private final String TEST_MULTIOP_KEY1 = "multiopKey1"; + private final String TEST_MULTIOP_KEY2 = "multiopKey2"; + private final String TEST_MULTIOP_KEY3 = "multiopKey3"; + private final String TEST_MULTIOP_VALUE1 = "multiopValue1"; + private final String TEST_MULTIOP_VALUE2 = "multiopValue2"; + private final String TEST_MULTIOP_VALUE3 = "multiopValue3"; private Cache cache; private int cacheServerPort; @@ -97,105 +98,111 @@ public class CacheOperationsJUnitTest { private Socket socket; private OutputStream outputStream; - @Rule - public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); @Rule public TestName testName = new TestName(); private ProtobufProtocolSerializer protobufProtocolSerializer; - @Before - public void setup() throws Exception { - // Test names prefixed with useSSL_ will setup the cache and socket to use SSL transport - boolean useSSL = testName.getMethodName().startsWith("useSSL_"); + @Before + public void setup() throws Exception { + // Test names prefixed with useSSL_ will setup the cache and socket to use SSL transport + boolean useSSL = testName.getMethodName().startsWith("useSSL_"); - Properties properties = new Properties(); - if (useSSL) { - updatePropertiesForSSLCache(properties); - } + Properties properties = new Properties(); + if (useSSL) { + updatePropertiesForSSLCache(properties); + } - CacheFactory cacheFactory = new CacheFactory(properties); - cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); - cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); - cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); - cache = cacheFactory.create(); + CacheFactory cacheFactory = new CacheFactory(properties); + cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); + cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); + cache = cacheFactory.create(); - CacheServer cacheServer = cache.addCacheServer(); - cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cacheServer.setPort(cacheServerPort); - cacheServer.start(); + CacheServer cacheServer = cache.addCacheServer(); + cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); - RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); - regionFactory.create(TEST_REGION); + RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); + regionFactory.create(TEST_REGION); - System.setProperty("geode.feature-protobuf-protocol", "true"); + System.setProperty("geode.feature-protobuf-protocol", "true"); - if (useSSL) { - socket = getSSLSocket(); - } else { - socket = new Socket("localhost", cacheServerPort); - } - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - outputStream = socket.getOutputStream(); + if (useSSL) { + socket = getSSLSocket(); + } else { + socket = new Socket("localhost", cacheServerPort); + } + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + outputStream = socket.getOutputStream(); - MessageUtil.performAndVerifyHandshake(socket); + MessageUtil.performAndVerifyHandshake(socket); serializationService = new ProtobufSerializationService(); protobufProtocolSerializer = new ProtobufProtocolSerializer(); } - @After - public void cleanup() throws IOException { - cache.close(); - socket.close(); - SocketCreatorFactory.close(); - } + @After + public void cleanup() throws IOException { + cache.close(); + socket.close(); + SocketCreatorFactory.close(); + } - @Test - public void testNewProtocolWithMultikeyOperations() throws Exception { - System.setProperty("geode.feature-protobuf-protocol", "true"); + private static String randomLengthString() { + Random random = new Random(); + StringBuffer stringBuffer = new StringBuffer(); + int length = (int) (random.nextInt(1024000)*(1.75*random.nextInt(10))); + for (int i = 0; i < (length); i++) { + stringBuffer.append("a"); + } + return stringBuffer.toString(); + } - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - Set<BasicTypes.Entry> putEntries = new HashSet<>(); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY1, - TEST_MULTIOP_VALUE1)); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, - TEST_MULTIOP_VALUE2)); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, - TEST_MULTIOP_VALUE3)); - ClientProtocol.Message putAllMessage = - ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries); - protobufProtocolSerializer.serialize(putAllMessage, outputStream); - validatePutAllResponse(socket, protobufProtocolSerializer, new HashSet<>()); - - Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); - getEntries.add(serializationService.encode(TEST_MULTIOP_KEY1)); - getEntries.add(serializationService.encode(TEST_MULTIOP_KEY2)); - getEntries.add(serializationService.encode(TEST_MULTIOP_KEY3)); - - RegionAPI.GetAllRequest getAllRequest = - ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, getEntries); - - ClientProtocol.Message getAllMessage = - ClientProtocol.Message.newBuilder().setGetAllRequest(getAllRequest).build(); - protobufProtocolSerializer.serialize(getAllMessage, outputStream); - validateGetAllResponse(socket, protobufProtocolSerializer); - - RegionAPI.KeySetRequest keySetRequest = - RegionAPI.KeySetRequest.newBuilder().setRegionName(TEST_REGION).build(); - ClientProtocol.Message keySetMessage = - ClientProtocol.Message.newBuilder().setKeySetRequest(keySetRequest).build(); - protobufProtocolSerializer.serialize(keySetMessage, outputStream); - validateKeySetResponse(socket, protobufProtocolSerializer); - } + @Test + public void testNewProtocolWithMultikeyOperations() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + for (int i = 0; i < 10000000; i++) { + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + Set<BasicTypes.Entry> putEntries = new HashSet<>(); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY1, + randomLengthString())); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, + randomLengthString())); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, + randomLengthString())); + ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( + ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries)); + protobufProtocolSerializer.serialize(putAllMessage, outputStream); + validatePutAllResponse(socket, protobufProtocolSerializer, new HashSet<>()); + + Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1)); +// getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); +// getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + + RegionAPI.GetAllRequest getAllRequest = + ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, getEntries); + + ClientProtocol.Message getAllMessage = ProtobufUtilities.createProtobufMessage( + ProtobufUtilities.createProtobufRequestWithGetAllRequest(getAllRequest)); + Thread.sleep(100); + protobufProtocolSerializer.serialize(getAllMessage, outputStream); + validateGetAllResponse(socket, protobufProtocolSerializer); + } + } - @Test - public void multiKeyOperationErrorsWithClasscastException() throws Exception { - RegionFactory<Float, Object> regionFactory = cache.createRegionFactory(); - regionFactory.setKeyConstraint(Float.class); - String regionName = "constraintRegion"; - regionFactory.create(regionName); - System.setProperty("geode.feature-protobuf-protocol", "true"); + @Test + public void multiKeyOperationErrorsWithClasscastException() throws Exception { + RegionFactory<Float, Object> regionFactory = cache.createRegionFactory(); + regionFactory.setKeyConstraint(Float.class); + String regionName = "constraintRegion"; + regionFactory.create(regionName); + System.setProperty("geode.feature-protobuf-protocol", "true"); Set<BasicTypes.Entry> putEntries = new HashSet<>(); putEntries.add(ProtobufUtilities.createEntry(serializationService, 2.2f, TEST_MULTIOP_VALUE1)); @@ -212,10 +219,10 @@ public class CacheOperationsJUnitTest { expectedFailedKeys.add(serializationService.encode(TEST_MULTIOP_KEY3)); validatePutAllResponse(socket, protobufProtocolSerializer, expectedFailedKeys); - ClientProtocol.Message getMessage = - MessageUtil.makeGetRequestMessage(serializationService, 2.2f, regionName); - protobufProtocolSerializer.serialize(getMessage, outputStream); - validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1); + ClientProtocol.Message getMessage = + MessageUtil.makeGetRequestMessage(serializationService, 2.2f, regionName); + protobufProtocolSerializer.serialize(getMessage, outputStream); + validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1); ClientProtocol.Message removeMessage = ProtobufRequestUtilities.createRemoveRequest(TEST_REGION, serializationService.encode(TEST_KEY)); @@ -234,8 +241,8 @@ public class CacheOperationsJUnitTest { assertEquals(ClientProtocol.Message.MessageTypeCase.GETRESPONSE, response.getMessageTypeCase()); RegionAPI.GetResponse getResponse = response.getGetResponse(); - assertFalse(getResponse.hasResult()); - } + assertFalse(getResponse.hasResult()); + } @Test public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception { @@ -298,11 +305,11 @@ public class CacheOperationsJUnitTest { response.getMessageTypeCase()); assertEquals(expectedFailedKeys.size(), response.getPutAllResponse().getFailedKeysCount()); - Stream<BasicTypes.EncodedValue> failedKeyStream = response.getPutAllResponse() - .getFailedKeysList().stream().map(BasicTypes.KeyedError::getKey); - assertTrue(failedKeyStream.allMatch(expectedFailedKeys::contains)); +// Stream<BasicTypes.EncodedValue> failedKeyStream = response.getPutAllResponse() +// .getFailedKeysList().stream().map(BasicTypes.KeyedError::getKey); +// assertTrue(failedKeyStream.allMatch(expectedFailedKeys::contains)); - } + } private void validateGetAllResponse(Socket socket, ProtobufProtocolSerializer protobufProtocolSerializer) @@ -339,7 +346,6 @@ public class CacheOperationsJUnitTest { Assert.fail("Unexpected key found by getAll: " + key); } } - } private void validateKeySetResponse(Socket socket, ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception { @@ -363,36 +369,36 @@ public class CacheOperationsJUnitTest { response.getMessageTypeCase()); } - private void updatePropertiesForSSLCache(Properties properties) { - String keyStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - String trustStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + private void updatePropertiesForSSLCache(Properties properties) { + String keyStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + String trustStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - properties.put(SSL_ENABLED_COMPONENTS, "server"); - properties.put(ConfigurationProperties.SSL_PROTOCOLS, SSL_PROTOCOLS); - properties.put(ConfigurationProperties.SSL_CIPHERS, SSL_CIPHERS); - properties.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(true)); + properties.put(SSL_ENABLED_COMPONENTS, "server"); + properties.put(ConfigurationProperties.SSL_PROTOCOLS, SSL_PROTOCOLS); + properties.put(ConfigurationProperties.SSL_CIPHERS, SSL_CIPHERS); + properties.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(true)); - properties.put(SSL_KEYSTORE_TYPE, "jks"); - properties.put(SSL_KEYSTORE, keyStore); - properties.put(SSL_KEYSTORE_PASSWORD, "password"); - properties.put(SSL_TRUSTSTORE, trustStore); - properties.put(SSL_TRUSTSTORE_PASSWORD, "password"); - } + properties.put(SSL_KEYSTORE_TYPE, "jks"); + properties.put(SSL_KEYSTORE, keyStore); + properties.put(SSL_KEYSTORE_PASSWORD, "password"); + properties.put(SSL_TRUSTSTORE, trustStore); + properties.put(SSL_TRUSTSTORE_PASSWORD, "password"); + } - private Socket getSSLSocket() throws IOException { - String keyStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - String trustStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - - SSLConfig sslConfig = new SSLConfig(); - sslConfig.setEnabled(true); - sslConfig.setCiphers(SSL_CIPHERS); - sslConfig.setProtocols(SSL_PROTOCOLS); - sslConfig.setRequireAuth(true); - sslConfig.setKeystoreType("jks"); - sslConfig.setKeystore(keyStorePath); - sslConfig.setKeystorePassword("password"); - sslConfig.setTruststore(trustStorePath); - sslConfig.setKeystorePassword("password"); + private Socket getSSLSocket() throws IOException { + String keyStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + String trustStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + + SSLConfig sslConfig = new SSLConfig(); + sslConfig.setEnabled(true); + sslConfig.setCiphers(SSL_CIPHERS); + sslConfig.setProtocols(SSL_PROTOCOLS); + sslConfig.setRequireAuth(true); + sslConfig.setKeystoreType("jks"); + sslConfig.setKeystore(keyStorePath); + sslConfig.setKeystorePassword("password"); + sslConfig.setTruststore(trustStorePath); + sslConfig.setKeystorePassword("password"); SocketCreator socketCreator = new SocketCreator(sslConfig); return socketCreator.connectForClient("localhost", cacheServerPort, 5000); -- To stop receiving notification emails like this one, please contact [email protected].
