This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 03f266854a6ab7ff1732d380c722ba64c018ecc1 Author: Riza Suminto <[email protected]> AuthorDate: Tue May 3 17:38:55 2022 -0700 [client] KUDU-3365: Expose INSERT/UPDATE metrics in the Java API The work done in the scope of KUDU-3351 included the server-side changes and corresponding changes in the Kudu C++ API to expose the metrics to the client applications. This patch implement similar changes to expose such metrics in Java client API. Change-Id: I956eb0c0a2cadcf3491550630b861bb48462e8eb Reviewed-on: http://gerrit.cloudera.org:8080/18489 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- .../org/apache/kudu/client/AsyncKuduSession.java | 12 +++++++++ .../main/java/org/apache/kudu/client/Batch.java | 5 +++- .../java/org/apache/kudu/client/BatchResponse.java | 17 +++++++++++- .../java/org/apache/kudu/client/KuduSession.java | 5 ++++ .../java/org/apache/kudu/client/Operation.java | 5 +++- .../org/apache/kudu/client/OperationResponse.java | 17 +++++++++++- .../org/apache/kudu/client/ResourceMetrics.java | 28 +++++++++++++++++++- .../apache/kudu/client/SessionConfiguration.java | 6 +++++ .../org/apache/kudu/client/TestKuduSession.java | 30 ++++++++++++++++++++++ 9 files changed, 120 insertions(+), 5 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 07397330e..9c0ea6912 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -183,6 +183,11 @@ public class AsyncKuduSession implements SessionConfiguration { private boolean ignoreAllDuplicateRows = false; private boolean ignoreAllNotFoundRows = false; + /** + * Cumulative operation metrics since the beginning of the session. + */ + private final ResourceMetrics writeOpMetrics = new ResourceMetrics(); + /** * Package-private constructor meant to be used via AsyncKuduClient * @param client client that creates this session @@ -317,6 +322,11 @@ public class AsyncKuduSession implements SessionConfiguration { return errorCollector.getErrors(); } + @Override + public ResourceMetrics getWriteOpMetrics() { + return this.writeOpMetrics; + } + /** * Flushes the buffered operations and marks this session as closed. * See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method. @@ -452,6 +462,7 @@ public class AsyncKuduSession implements SessionConfiguration { // are visible should the callback interrogate the error collector. operationResponse.getOperation().callback(operationResponse); } + writeOpMetrics.update(response.getWriteOpMetrics()); return response; } @@ -624,6 +635,7 @@ public class AsyncKuduSession implements SessionConfiguration { return client.sendRpcToTablet(operation) .addCallbackDeferring(resp -> { client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw()); + writeOpMetrics.update(resp.getWriteOpMetrics()); return Deferred.fromResult(resp); }) .addErrback(new SingleOperationErrCallback(operation)); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java index 0c5c578b6..a248802f8 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java @@ -184,12 +184,15 @@ class Batch extends KuduRpc<BatchResponse> { } errorsPB = filteredErrors; } + ResourceMetrics metrics = builder.hasResourceMetrics() ? + ResourceMetrics.fromResourceMetricsPB(builder.getResourceMetrics()) : null; BatchResponse response = new BatchResponse(timeoutTracker.getElapsedMillis(), tsUUID, builder.getTimestamp(), errorsPB, operations, - operationIndexes); + operationIndexes, + metrics); if (injectedError != null) { if (injectedlatencyMs > 0) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java index 24078eb4c..f11f930b3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java @@ -20,6 +20,7 @@ package org.apache.kudu.client; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import javax.annotation.Nullable; import com.google.common.collect.ImmutableList; import org.apache.yetus.audience.InterfaceAudience; @@ -37,6 +38,7 @@ public class BatchResponse extends KuduRpcResponse { private final List<RowError> rowErrors; private final List<OperationResponse> individualResponses; private final List<Integer> responsesIndexes; + private final ResourceMetrics writeOpMetrics; /** * Package-private constructor to be used by the RPCs. @@ -45,13 +47,15 @@ public class BatchResponse extends KuduRpcResponse { * @param errorsPB a list of row errors, can be empty * @param operations the list of operations which created this response * @param indexes the list of operations' order index + * @param writeOpMetrics the write operation metrics, can be null */ BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp, List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB, List<Operation> operations, - List<Integer> indexes) { + List<Integer> indexes, + ResourceMetrics writeOpMetrics) { super(elapsedMillis, tsUUID); this.writeTimestamp = writeTimestamp; individualResponses = new ArrayList<>(operations.size()); @@ -61,6 +65,7 @@ public class BatchResponse extends KuduRpcResponse { } else { rowErrors = new ArrayList<>(errorsPB.size()); } + this.writeOpMetrics = writeOpMetrics; // Populate the list of individual row responses and the list of row errors. Not all the rows // maybe have errors, but 'errorsPB' contains them in the same order as the operations that @@ -95,6 +100,7 @@ public class BatchResponse extends KuduRpcResponse { rowErrors = ImmutableList.of(); this.individualResponses = individualResponses; this.responsesIndexes = indexes; + this.writeOpMetrics = null; } /** @@ -122,4 +128,13 @@ public class BatchResponse extends KuduRpcResponse { return responsesIndexes; } + /** + * Return the write operation metrics associated with this batch. + * @return write operation metrics associated with this batch, or null if there is none. + */ + @Nullable + ResourceMetrics getWriteOpMetrics() { + return this.writeOpMetrics; + } + } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java index 8f0870ef7..b1845bd5f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java @@ -203,4 +203,9 @@ public class KuduSession implements SessionConfiguration { public RowErrorsAndOverflowStatus getPendingErrors() { return session.getPendingErrors(); } + + @Override + public ResourceMetrics getWriteOpMetrics() { + return session.getWriteOpMetrics(); + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java index 4d871c9ad..0ed9caca2 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java @@ -223,11 +223,14 @@ public abstract class Operation extends KuduRpc<OperationResponse> { error = null; } } + Tserver.ResourceMetricsPB metricsPB = builder.hasResourceMetrics() ? + builder.getResourceMetrics() : null; OperationResponse response = new OperationResponse(timeoutTracker.getElapsedMillis(), tsUUID, builder.getTimestamp(), this, - error); + error, + metricsPB); return new Pair<>(response, builder.hasError() ? builder.getError() : null); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java index d4234ac67..ca46387bf 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java @@ -19,6 +19,7 @@ package org.apache.kudu.client; import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -32,6 +33,7 @@ public class OperationResponse extends KuduRpcResponse { private final long writeTimestamp; private final RowError rowError; private final Operation operation; + private final ResourceMetrics writeOpMetrics; /** * Package-private constructor to build an OperationResponse with a row error in the pb format. @@ -44,11 +46,14 @@ public class OperationResponse extends KuduRpcResponse { String tsUUID, long writeTimestamp, Operation operation, - Tserver.WriteResponsePB.PerRowErrorPB errorPB) { + Tserver.WriteResponsePB.PerRowErrorPB errorPB, + Tserver.ResourceMetricsPB metricsPB) { super(elapsedMillis, tsUUID); this.writeTimestamp = writeTimestamp; this.rowError = errorPB == null ? null : RowError.fromRowErrorPb(errorPB, operation, tsUUID); this.operation = operation; + this.writeOpMetrics = metricsPB == null ? + null : ResourceMetrics.fromResourceMetricsPB(metricsPB); } /** @@ -67,6 +72,7 @@ public class OperationResponse extends KuduRpcResponse { this.writeTimestamp = writeTimestamp; this.rowError = rowError; this.operation = operation; + this.writeOpMetrics = null; } /** @@ -116,4 +122,13 @@ public class OperationResponse extends KuduRpcResponse { Operation getOperation() { return operation; } + + /** + * Return the write operation metrics associated with this batch. + * @return write operation metrics associated with this batch, or null if there is none. + */ + @Nullable + ResourceMetrics getWriteOpMetrics() { + return this.writeOpMetrics; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java index 01ee22b93..1e7e7cb22 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java @@ -35,7 +35,7 @@ import org.apache.kudu.tserver.Tserver.ResourceMetricsPB; * A container for scanner resource metrics. * <p> * This class wraps a mapping from metric name to metric value for server-side - * metrics associated with a scanner. + * metrics associated with a scanner and write operation. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -75,6 +75,20 @@ public class ResourceMetrics { } } + /** + * Increment this instance's metric values with those found in 'resourceMetrics'. + * Noop if 'resourceMetrics' is null. + * @param resourceMetrics resource metrics protobuf object to be used to update this object. + * Can be null, which will not do anything. + */ + void update(ResourceMetrics resourceMetrics) { + if (resourceMetrics != null) { + for (Map.Entry<String, LongAdder> entry : resourceMetrics.metrics.entrySet()) { + increment(entry.getKey(), entry.getValue().sum()); + } + } + } + /** * Increment the metric value by the specific amount. * @param name the name of the metric whose value is to be incremented @@ -83,4 +97,16 @@ public class ResourceMetrics { private void increment(String name, long amount) { metrics.computeIfAbsent(name, k -> new LongAdder()).add(amount); } + + /** + * Converts a ResourceMetricsPB into a ResourceMetrics. + * @param resourceMetricsPb a resource metrics in its PB format. Must not be null. + * @return a ResourceMetrics + */ + static ResourceMetrics fromResourceMetricsPB(ResourceMetricsPB resourceMetricsPb) { + Preconditions.checkNotNull(resourceMetricsPb); + ResourceMetrics result = new ResourceMetrics(); + result.update(resourceMetricsPb); + return result; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java index d3e942d0a..dfb756652 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java @@ -207,4 +207,10 @@ public interface SessionConfiguration { * @return an object that contains the errors and the overflow status */ RowErrorsAndOverflowStatus getPendingErrors(); + + /** + * Return cumulative write operation metrics since the beginning of the session. + * @return cumulative write operation metrics since the beginning of the session. + */ + ResourceMetrics getWriteOpMetrics(); } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java index 062fc6747..905ccbe74 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java @@ -344,6 +344,24 @@ public class TestKuduSession { } } + private void doVerifyMetrics(KuduSession session, + long successfulInserts, + long insertIgnoreErrors, + long successfulUpserts, + long successfulUpdates, + long updateIgnoreErrors, + long successfulDeletes, + long deleteIgnoreErrors) { + ResourceMetrics metrics = session.getWriteOpMetrics(); + assertEquals(successfulInserts, metrics.getMetric("successful_inserts")); + assertEquals(insertIgnoreErrors, metrics.getMetric("insert_ignore_errors")); + assertEquals(successfulUpserts, metrics.getMetric("successful_upserts")); + assertEquals(successfulUpdates, metrics.getMetric("successful_updates")); + assertEquals(updateIgnoreErrors, metrics.getMetric("update_ignore_errors")); + assertEquals(successfulDeletes, metrics.getMetric("successful_deletes")); + assertEquals(deleteIgnoreErrors, metrics.getMetric("delete_ignore_errors")); + } + @Test(timeout = 10000) public void testUpsert() throws Exception { KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions()); @@ -358,6 +376,7 @@ public class TestKuduSession { "INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " + "STRING column3_s=a string, BOOL column4_b=true", rowStrings.get(0)); + doVerifyMetrics(session, 0, 0, 1, 0, 0, 0, 0); // Test an Upsert that acts as an Update. assertFalse(session.apply(createUpsert(table, 1, 2, false)).hasRowError()); @@ -366,6 +385,7 @@ public class TestKuduSession { "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + "STRING column3_s=a string, BOOL column4_b=true", rowStrings.get(0)); + doVerifyMetrics(session, 0, 0, 2, 0, 0, 0, 0); } @Test(timeout = 10000) @@ -378,6 +398,7 @@ public class TestKuduSession { session.apply(createUpsert(table, 1, 1, false)); session.apply(createInsertIgnore(table, 1)); List<OperationResponse> results = session.flush(); + doVerifyMetrics(session, 1, 1, 1, 0, 0, 0, 0); for (OperationResponse result : results) { assertFalse(result.toString(), result.hasRowError()); } @@ -398,6 +419,7 @@ public class TestKuduSession { session.apply(createInsertIgnore(table, 1)); session.apply(createInsert(table, 1)); List<OperationResponse> results = session.flush(); + doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0); assertFalse(results.get(0).toString(), results.get(0).hasRowError()); assertTrue(results.get(1).toString(), results.get(1).hasRowError()); assertTrue(results.get(1).getRowError().getErrorStatus().isAlreadyPresent()); @@ -421,6 +443,7 @@ public class TestKuduSession { "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + "STRING column3_s=a string, BOOL column4_b=true", rowStrings.get(0)); + doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0); // Test insert ignore does not return a row error. assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError()); @@ -429,6 +452,7 @@ public class TestKuduSession { "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + "STRING column3_s=a string, BOOL column4_b=true", rowStrings.get(0)); + doVerifyMetrics(session, 1, 1, 0, 0, 0, 0, 0); } @@ -440,9 +464,11 @@ public class TestKuduSession { // Test update ignore does not return a row error. assertFalse(session.apply(createUpdateIgnore(table, 1, 1, false)).hasRowError()); assertEquals(0, scanTableToStrings(table).size()); + doVerifyMetrics(session, 0, 0, 0, 0, 1, 0, 0); assertFalse(session.apply(createInsert(table, 1)).hasRowError()); assertEquals(1, scanTableToStrings(table).size()); + doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0); // Test update ignore implements normal update. assertFalse(session.apply(createUpdateIgnore(table, 1, 2, false)).hasRowError()); @@ -452,6 +478,7 @@ public class TestKuduSession { "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " + "STRING column3_s=a string, BOOL column4_b=true", rowStrings.get(0)); + doVerifyMetrics(session, 1, 0, 0, 1, 1, 0, 0); } @Test(timeout = 10000) @@ -461,13 +488,16 @@ public class TestKuduSession { // Test delete ignore does not return a row error. assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError()); + doVerifyMetrics(session, 0, 0, 0, 0, 0, 0, 1); assertFalse(session.apply(createInsert(table, 1)).hasRowError()); assertEquals(1, scanTableToStrings(table).size()); + doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 1); // Test delete ignore implements normal delete. assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError()); assertEquals(0, scanTableToStrings(table).size()); + doVerifyMetrics(session, 1, 0, 0, 0, 0, 1, 1); } @Test(timeout = 10000)
