This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0dc3e9e KUDU-3148: [test] Add Java client metrics
0dc3e9e is described below
commit 0dc3e9e0a7306ce9b618158b8a20af9b10f4a482
Author: Grant Henke <[email protected]>
AuthorDate: Thu Jun 11 10:54:27 2020 -0500
KUDU-3148: [test] Add Java client metrics
This patch adds the basic tools for instrumenting the Java
Kudu client with metrics. The primary purpose for this patch
is greater visibility and validation in tests. However, it leverages
micrometer for the metrics to allow for expanded usage and use
cases in the future (e.g JMX publishing, periodic diagnostic
logging, etc).
In this patch I add some counter metrics for RPC requests,
retries, and responses. I leverage these metrics in
TestScanToken to validate the current behavior of ScanToken
generation and use. A follow on patch will use these metrics
more to validate improvements to the ScanToken.
I did not use or enhance the existing Statistics.java
implementation because it didn’t appear to be flexible and
powerful enough for general purpose metrics without a lot of
work (effectively writing something like micrometer). Outside
of the additional work, Statistics.java is public API which makes
it more difficult to change.
Change-Id: I5c63835dd717c2c1e1dca06ed5dea3c2cadcd018
Reviewed-on: http://gerrit.cloudera.org:8080/16067
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
---
java/gradle/dependencies.gradle | 2 +
java/kudu-client/build.gradle | 1 +
.../org/apache/kudu/client/AsyncKuduClient.java | 13 +-
.../java/org/apache/kudu/client/KuduClient.java | 8 +
.../java/org/apache/kudu/client/KuduMetrics.java | 228 +++++++++++++++++++++
.../main/java/org/apache/kudu/client/RpcProxy.java | 22 +-
.../org/apache/kudu/client/TestKuduMetrics.java | 76 +++++++
.../java/org/apache/kudu/client/TestScanToken.java | 46 ++++-
.../java/org/apache/kudu/test/KuduTestHarness.java | 3 +
.../java/org/apache/kudu/test/MetricTestUtils.java | 112 ++++++++++
10 files changed, 508 insertions(+), 3 deletions(-)
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 04b47e5..e66d7f5 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -46,6 +46,7 @@ versions += [
jsr305 : "3.0.2",
junit : "4.13",
log4j : "2.11.2",
+ micrometer : "1.5.1",
mockito : "3.3.3",
murmur : "1.0.0",
netty : "4.1.49.Final",
@@ -102,6 +103,7 @@ libs += [
junit : "junit:junit:$versions.junit",
log4j :
"org.apache.logging.log4j:log4j-1.2-api:$versions.log4j",
log4jSlf4jImpl :
"org.apache.logging.log4j:log4j-slf4j-impl:$versions.log4j",
+ micrometerCore :
"io.micrometer:micrometer-core:$versions.micrometer",
mockitoCore : "org.mockito:mockito-core:$versions.mockito",
murmur : "com.sangupta:murmur:$versions.murmur",
netty : "io.netty:netty-all:$versions.netty",
diff --git a/java/kudu-client/build.gradle b/java/kudu-client/build.gradle
index 382d746..f681ac2 100644
--- a/java/kudu-client/build.gradle
+++ b/java/kudu-client/build.gradle
@@ -29,6 +29,7 @@ dependencies {
compileUnshaded libs.slf4jApi
compile libs.guava
+ compile libs.micrometerCore
compile libs.murmur
compile libs.netty
compile libs.protobufJava
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 4b40a39..e808209 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -315,6 +315,8 @@ public class AsyncKuduClient implements AutoCloseable {
private final HashedWheelTimer timer;
+ private final String clientId;
+
/**
* Timestamp required for HybridTime external consistency through timestamp
* propagation.
@@ -378,7 +380,8 @@ public class AsyncKuduClient implements AutoCloseable {
this.statisticsDisabled = b.statisticsDisabled;
this.statistics = statisticsDisabled ? null : new Statistics();
this.timer = b.timer;
- this.requestTracker = new
RequestTracker(UUID.randomUUID().toString().replace("-", ""));
+ this.clientId = UUID.randomUUID().toString().replace("-", "");
+ this.requestTracker = new RequestTracker(clientId);
this.securityContext = new SecurityContext();
this.connectionCache = new ConnectionCache(securityContext, bootstrap);
@@ -538,6 +541,14 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * Returns the unique client id assigned to this client.
+ * @return the unique client id assigned to this client.
+ */
+ String getClientId() {
+ return clientId;
+ }
+
+ /**
* Returns a synchronous {@link KuduClient} which wraps this asynchronous
client.
* Calling {@link KuduClient#close} on the returned client will close this
client.
* If this asynchronous client should outlive the returned synchronous
client,
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 7c84373..ebc9096 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -93,6 +93,14 @@ public class KuduClient implements AutoCloseable {
}
/**
+ * Returns the unique client id assigned to this client.
+ * @return the unique client id assigned to this client.
+ */
+ String getClientId() {
+ return asyncClient.getClientId();
+ }
+
+ /**
* Returns the Hive Metastore configuration of the cluster.
*
* @return the Hive Metastore configuration of the cluster
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduMetrics.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduMetrics.java
new file mode 100644
index 0000000..83cda05
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduMetrics.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.Comparator;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.micrometer.core.instrument.config.MeterFilter;
+import io.micrometer.core.instrument.config.NamingConvention;
+import io.micrometer.core.instrument.search.Search;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A static utility class to contain constants and methods for working with
+ * Kudu Java client metrics.
+ *
+ * NOTE: The metrics are not considered public API yet. We should not expose
+ * micrometer objects/classes through any public interface or method, even
+ * when we do make them public.
+ */
[email protected]
[email protected]
+public class KuduMetrics {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduMetrics.class);
+
+ public static final String[] EMPTY_TAGS = new String[]{};
+
+ // RPC Metrics
+ public static final KuduMetricId RPC_REQUESTS_METRIC =
+ new KuduMetricId("rpc.requests", "A count of the sent request RPCs",
"requests");
+ public static final KuduMetricId RPC_RETRIES_METRIC =
+ new KuduMetricId("rpc.retries", "A count of the retried request RPCs",
"retries");
+ public static final KuduMetricId RPC_RESPONSE_METRIC =
+ new KuduMetricId("rpc.responses", "A count of the RPC responses
received", "responses");
+
+ // Common Tags
+ public static final String CLIENT_ID_TAG = "client.id";
+ public static final String SERVER_ID_TAG = "server.id";
+ public static final String SERVICE_NAME_TAG = "service.name";
+ public static final String METHOD_NAME_TAG = "method.name";
+
+ // TODO(KUDU-3148): After extensive testing consider enabling metrics by
default.
+ private static boolean enabled = false;
+ private static CompositeMeterRegistry registry = createDisabledRegistry();
+
+ /**
+ * This class is meant to be used statically.
+ */
+ private KuduMetrics() {
+ }
+
+ /**
+ * Enable or disable metric tracking.
+ * Disabling the metrics will discard any previously recorded metrics.
+ *
+ * @param enable If true, metric tracking is enabled.
+ */
+ public static synchronized void setEnabled(boolean enable) {
+ if (enable && !enabled) {
+ CompositeMeterRegistry oldRegistry = registry;
+ registry = createRegistry();
+ enabled = true;
+ oldRegistry.close();
+ } else if (!enable && enabled) {
+ CompositeMeterRegistry oldRegistry = registry;
+ registry = createDisabledRegistry();
+ enabled = false;
+ oldRegistry.close();
+ }
+ }
+
+ private static CompositeMeterRegistry createRegistry() {
+ CompositeMeterRegistry registry = new CompositeMeterRegistry();
+ // This is the default naming convention that separates lowercase words
+ // with a '.' (dot) character.
+ registry.config().namingConvention(NamingConvention.dot);
+ // Use the minimal meter registry. Once this is used/useful for more than
tests
+ // we may want to consider something more exposed such as JMX.
+ registry.add(new SimpleMeterRegistry());
+ return registry;
+ }
+
+ private static CompositeMeterRegistry createDisabledRegistry() {
+ CompositeMeterRegistry registry = createRegistry();
+ // Add a filter to deny all meters. When a meter is used with this
registry,
+ // the registry will return a NOOP version of that meter. Anything recorded
+ // to it is discarded immediately with minimal overhead.
+ registry.config().meterFilter(MeterFilter.deny());
+ return registry;
+ }
+
+ /**
+ * @return the total number of registered metrics.
+ */
+ static int numMetrics() {
+ return registry.getMeters().size();
+ }
+
+ /**
+ * @param id the metric id
+ * @return the number of all the matching metrics.
+ */
+ static int numMetrics(KuduMetricId id) {
+ return numMetrics(id, EMPTY_TAGS);
+ }
+
+ /**
+ * @param id the metric id
+ * @param tags tags must be an even number of arguments representing
key/value pairs of tags.
+ * @return the sum of all the matching metrics.
+ */
+ static int numMetrics(KuduMetricId id, String... tags) {
+ return Search.in(registry).name(id.name).tags(tags).counters().size();
+ }
+
+ /**
+ * @param tags tags must be an even number of arguments representing
key/value pairs of tags.
+ * @return the sum of all the matching metrics.
+ */
+ static int numMetrics(String... tags) {
+ return Search.in(registry).tags(tags).counters().size();
+ }
+
+ /**
+ * Returns the counter meter for the given metric id and tags.
+ * If the meter is already registered, it will lookup the existing meter and
+ * return it. Otherwise it will register a new meter and return that.
+ *
+ * @param id the metric id
+ * @param tags tags must be an even number of arguments representing
key/value pairs of tags.
+ * @return a counter
+ */
+ static Counter counter(KuduMetricId id, String... tags) {
+ return Counter.builder(id.getName())
+ .description(id.getDescription())
+ .baseUnit(id.getUnit())
+ .tags(tags)
+ .register(registry);
+ }
+
+ /**
+ * @param id the metric id
+ * @return the sum of all the matching metrics.
+ */
+ public static double totalCount(KuduMetricId id) {
+ return totalCount(id, EMPTY_TAGS);
+ }
+
+ /**
+ * @param id the metric id
+ * @param tags tags must be an even number of arguments representing
key/value pairs of tags.
+ * @return the sum of all the matching metrics.
+ */
+ public static double totalCount(KuduMetricId id, String... tags) {
+ return Search.in(registry).name(id.name).tags(tags).counters().stream()
+ .mapToDouble(Counter::count).sum();
+ }
+
+ /**
+ * Logs the metric values at the INFO level one metric per line.
+ * The output format for each metric is:
+ * <name> {<tag.key>=<tag.value>,...} : <value> <unit>
+ */
+ static void logMetrics() {
+ registry.getMeters().stream()
+ // Sort by id to ensure the same order each time.
+ .sorted(Comparator.comparing(m -> m.getId().toString()))
+ .forEach(m -> {
+ // Generate tags string as {k=v,...}
+ String tags = m.getId().getTags().stream()
+ .sorted()
+ .map(t -> t.getKey() + "=" + t.getValue())
+ .collect(joining(",", "{", "}"));
+ String key = m.getId().getName() + " " + tags;
+ String value = "unknown";
+ if (m instanceof Counter) {
+ value = ((Counter) m).count() + " " + m.getId().getBaseUnit();
+ }
+ LOG.info(key + " : " + value);
+ });
+ }
+
+ private static class KuduMetricId {
+ private final String name;
+ private final String description;
+ private final String unit;
+
+ private KuduMetricId(String name, String description, String unit) {
+ this.name = name;
+ this.description = description;
+ this.unit = unit;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+ }
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 52dd786..36b1737 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -26,6 +26,11 @@
package org.apache.kudu.client;
+import static org.apache.kudu.client.KuduMetrics.RPC_REQUESTS_METRIC;
+import static org.apache.kudu.client.KuduMetrics.RPC_RESPONSE_METRIC;
+import static org.apache.kudu.client.KuduMetrics.RPC_RETRIES_METRIC;
+import static org.apache.kudu.client.KuduMetrics.counter;
+
import java.util.Set;
import javax.annotation.Nonnull;
@@ -114,6 +119,10 @@ class RpcProxy {
static <R> void sendRpc(final AsyncKuduClient client,
final Connection connection,
final KuduRpc<R> rpc) {
+ counter(RPC_REQUESTS_METRIC, rpcTags(client, connection, rpc)).increment();
+ if (rpc.attempt > 1) {
+ counter(RPC_RETRIES_METRIC, rpcTags(client, connection,
rpc)).increment();
+ }
try {
// Throw an exception to enable testing failures. See `failNextRpcs`.
if (staticNumFail > 0) {
@@ -224,7 +233,7 @@ class RpcProxy {
response.getTotalResponseSize(), rpc);
}
}
-
+ counter(RPC_RESPONSE_METRIC, rpcTags(client, connection, rpc)).increment();
RpcTraceFrame.RpcTraceFrameBuilder traceBuilder = new
RpcTraceFrame.RpcTraceFrameBuilder(
rpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(
connection.getServerInfo());
@@ -441,4 +450,15 @@ class RpcProxy {
Connection getConnection() {
return connection;
}
+
+ private static String[] rpcTags(final AsyncKuduClient client,
+ final Connection connection,
+ final KuduRpc<?> rpc) {
+ return new String[] {
+ KuduMetrics.SERVICE_NAME_TAG, rpc.serviceName(),
+ KuduMetrics.METHOD_NAME_TAG, rpc.method(),
+ KuduMetrics.SERVER_ID_TAG, connection.getServerInfo().getUuid(),
+ KuduMetrics.CLIENT_ID_TAG, client.getClientId()
+ };
+ }
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduMetrics.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduMetrics.java
new file mode 100644
index 0000000..09b989d
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduMetrics.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.noop.NoopCounter;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.kudu.test.KuduTestHarness;
+
+public class TestKuduMetrics {
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ @Test
+ public void testDisabledMetrics() throws Exception {
+ KuduMetrics.setEnabled(false);
+
+ // Creating a meter results in a no-op meter that always returns 0.
+ Counter foo =
+ KuduMetrics.counter(KuduMetrics.RPC_RESPONSE_METRIC,
KuduMetrics.CLIENT_ID_TAG, "foo");
+ Assert.assertTrue(foo instanceof NoopCounter);
+ foo.increment();
+ Assert.assertEquals(0, (int) foo.count());
+
+ // The registry doesn't have any meters.
+ Assert.assertEquals(0, KuduMetrics.numMetrics());
+ }
+
+ @Test
+ public void testClientIdFilter() throws Exception {
+ KuduClient c1 = new
KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()).build();
+ KuduClient c2 = new
KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()).build();
+
+ c1.createTable("c1-table", getBasicSchema(), getBasicCreateTableOptions());
+ c2.createTable("c2-table", getBasicSchema(), getBasicCreateTableOptions());
+
+ String c1Id = c1.getClientId();
+ String c2Id = c1.getClientId();
+
+ int totalNumMetrics = KuduMetrics.numMetrics();
+ int c1NumMetrics = KuduMetrics.numMetrics(KuduMetrics.CLIENT_ID_TAG, c1Id);
+ int c2NumMetrics = KuduMetrics.numMetrics(KuduMetrics.CLIENT_ID_TAG, c2Id);
+
+ KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
+ Assert.assertEquals(totalNumMetrics, c1NumMetrics + c2NumMetrics);
+
+ // Disable the metrics and validate they are cleared.
+ KuduMetrics.setEnabled(false);
+ Assert.assertEquals(0, KuduMetrics.numMetrics());
+ // Re-enable and verify they remain cleared.
+ KuduMetrics.setEnabled(true);
+ Assert.assertEquals(0, KuduMetrics.numMetrics());
+ }
+}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index d3611f0..80c7141 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -24,12 +24,15 @@ import static
org.apache.kudu.test.ClientTestUtil.createDefaultTable;
import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema;
import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
import static org.apache.kudu.test.ClientTestUtil.loadDefaultTable;
+import static org.apache.kudu.test.MetricTestUtils.totalRequestCount;
+import static org.apache.kudu.test.MetricTestUtils.validateRequestCount;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import com.google.common.collect.ImmutableList;
@@ -477,10 +480,51 @@ public class TestScanToken {
KuduScanner scanner = tokens.get(0).intoScanner(client);
- // TODO(wdberkeley): Handle renaming a column between when the token is
rehydrated as a scanner
+ // TODO(KUDU-3146): Handle renaming a column between when the token is
rehydrated as a scanner
// and when the scanner first hits a replica. Note that this is almost
certainly a very
// short period of vulnerability.
checkDiffScanResults(scanner, 3 * numRows / 4, numRows / 4);
}
+
+ @Test
+ public void testScanTokenRequests() throws Exception {
+ Schema schema = getBasicSchema();
+ CreateTableOptions createOptions = new CreateTableOptions();
+ createOptions.setRangePartitionColumns(ImmutableList.of());
+ createOptions.setNumReplicas(1);
+ KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+ // Use a new client to simulate hydrating in a new process.
+ KuduClient newClient =
+ new
KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()).build();
+ newClient.getTablesList(); // List the tables to prevent counting
initialization RPCs.
+
+ KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
+ long beforeRequests = totalRequestCount();
+
+ // Validate that building a scan token results in a single
GetTableLocations request.
+ KuduScanToken token = validateRequestCount(1, client.getClientId(),
+ "GetTableLocations", () -> {
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder =
client.newScanTokenBuilder(table);
+ List<KuduScanToken> tokens = tokenBuilder.build();
+ assertEquals(1, tokens.size());
+ return tokens.get(0);
+ });
+
+ // Validate that hydrating a token into a scanner results in a single
GetTableSchema request.
+ KuduScanner scanner = validateRequestCount(1, newClient.getClientId(),
"GetTableSchema",
+ () -> token.intoScanner(newClient));
+
+ // Validate that starting to scan results in a GetTableLocations request
and a Scan request.
+ validateRequestCount(2, newClient.getClientId(),
Arrays.asList("GetTableLocations", "Scan"),
+ scanner::nextRows);
+
+ long afterRequests = totalRequestCount();
+
+ // Validate no other unexpected requests were sent.
+ // GetTableLocations x 2, GetTableSchema, Scan.
+ KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
+ assertEquals(4, afterRequests - beforeRequests);
+ }
}
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index beb377f..2b65844 100644
---
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -40,6 +40,7 @@ import
org.apache.kudu.client.AsyncKuduClient.AsyncKuduClientBuilder;
import org.apache.kudu.client.HostAndPort;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduMetrics;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.LocatedTablet;
import org.apache.kudu.client.RemoteTablet;
@@ -138,6 +139,8 @@ public class KuduTestHarness extends ExternalResource {
@Override
public void before() throws Exception {
FakeDNS.getInstance().install();
+ // Enable the client metrics for tests.
+ KuduMetrics.setEnabled(true);
LOG.info("Creating a new MiniKuduCluster...");
miniCluster = clusterBuilder.build();
LOG.info("Creating a new Kudu client...");
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/MetricTestUtils.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/MetricTestUtils.java
new file mode 100644
index 0000000..c802c4a
--- /dev/null
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/MetricTestUtils.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.client.KuduMetrics;
+
[email protected]
[email protected]
+public class MetricTestUtils {
+
+ /**
+ * @return the total sum of "rpc.request" metrics
+ */
+ public static long totalRequestCount() {
+ return (long) KuduMetrics.totalCount(KuduMetrics.RPC_REQUESTS_METRIC);
+ }
+
+ /**
+ * Validates that the count change in the matching "rpc.request" metrics
matches
+ * the expectedCount when the callable f is called.
+ *
+ * @param expectedCount the expected count
+ * @param clientId the clientId to filter on
+ * @param f the callable to call and validate
+ * @param <T> the return type
+ * @return the return value from f
+ * @throws Exception when f throws an exception
+ */
+ public static <T> T validateRequestCount(int expectedCount, String clientId,
+ Callable<T> f) throws Exception {
+ return validateRequestCount(expectedCount, clientId,
Collections.emptyList(), f);
+ }
+
+ /**
+ * Validates that the count change in the matching "rpc.request" metrics
matches
+ * the expectedCount when the callable f is called.
+ *
+ * @param expectedCount the expected count
+ * @param clientId the clientId to filter on
+ * @param rpcMethodName the rpc method name to filter on
+ * @param f the callable to call and validate
+ * @param <T> the return type
+ * @return the return value from f
+ * @throws Exception when f throws an exception
+ */
+ public static <T> T validateRequestCount(int expectedCount, String clientId,
+ String rpcMethodName, Callable<T>
f) throws Exception {
+ return validateRequestCount(expectedCount, clientId,
+ Collections.singletonList(rpcMethodName), f);
+ }
+
+ /**
+ * Validates that the count change in the matching "rpc.request" metrics
matches
+ * the expectedCount when the callable f is called.
+ *
+ * @param expectedCount the expected count
+ * @param clientId the clientId to filter on
+ * @param rpcMethodNames the rpc method names to filter on
+ * @param f the callable to call and validate
+ * @param <T> the return type
+ * @return the return value from f
+ * @throws Exception when f throws an exception
+ */
+ public static <T> T validateRequestCount(int expectedCount, String clientId,
+ List<String> rpcMethodNames,
Callable<T> f)
+ throws Exception {
+ Map<String, Long> beforeMap = new HashMap<>();
+ for (String rpcMethodName : rpcMethodNames) {
+ beforeMap.put(rpcMethodName,
+ (long) KuduMetrics.totalCount(KuduMetrics.RPC_REQUESTS_METRIC,
+ KuduMetrics.CLIENT_ID_TAG, clientId,
KuduMetrics.METHOD_NAME_TAG, rpcMethodName));
+ }
+ T t = f.call();
+ long count = 0;
+ for (Map.Entry<String, Long> entry : beforeMap.entrySet()) {
+ String rpcMethodName = entry.getKey();
+ long before = entry.getValue();
+ long after = (long)
KuduMetrics.totalCount(KuduMetrics.RPC_REQUESTS_METRIC,
+ KuduMetrics.CLIENT_ID_TAG, clientId, KuduMetrics.METHOD_NAME_TAG,
rpcMethodName);
+ count += after - before;
+ }
+ assertEquals(expectedCount, count);
+ return t;
+ }
+
+}