This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0b06cd897d2 Fix BigQuerySinkMetrics constants and increment metrics in
more places. (#30067)
0b06cd897d2 is described below
commit 0b06cd897d2016e702452e16bc66018528896098
Author: JayajP <[email protected]>
AuthorDate: Tue Jan 23 12:51:09 2024 -0800
Fix BigQuerySinkMetrics constants and increment metrics in more places.
(#30067)
* Fix bigquerysinkmetrics constants and increment metrics in a few more
places
* Spotless
---
.../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java | 12 ++++----
.../bigquery/StorageApiWritesShardedRecords.java | 5 ++--
.../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 35 ++++++++++++++--------
3 files changed, 31 insertions(+), 21 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index a89707c1919..c0f470b3921 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -53,9 +53,9 @@ public class BigQuerySinkMetrics {
public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";
// Base Metric names
- private static final String RPC_REQUESTS = "RpcRequests";
+ private static final String RPC_REQUESTS = "RpcRequestsCount";
private static final String RPC_LATENCY = "RpcLatency";
- private static final String APPEND_ROWS_ROW_STATUS = "AppendRowsRowStatus";
+ private static final String APPEND_ROWS_ROW_STATUS = "RowsAppendedCount";
private static final String THROTTLED_TIME = "ThrottledTime";
// StorageWriteAPI Method names
@@ -73,10 +73,10 @@ public class BigQuerySinkMetrics {
}
// Metric labels
- private static final String TABLE_ID_LABEL = "TableId";
- private static final String RPC_STATUS_LABEL = "RpcStatus";
- private static final String RPC_METHOD = "Method";
- private static final String ROW_STATUS = "RowStatus";
+ private static final String TABLE_ID_LABEL = "table_id";
+ private static final String RPC_STATUS_LABEL = "rpc_status";
+ private static final String RPC_METHOD = "rpc_method";
+ private static final String ROW_STATUS = "row_status";
// Delimiters
private static final char LABEL_DELIMITER = ';';
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 0f9b07d0c40..b612f199a29 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -652,7 +652,7 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
BigQuerySinkMetrics.reportFailedRPCMetrics(
- failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS,
shortTableId);
String errorCode =
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
@@ -801,7 +801,8 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS,
shortTableId);
BigQuerySinkMetrics.appendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
BigQuerySinkMetrics.OK, shortTableId);
+ BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
BigQuerySinkMetrics.OK, shortTableId)
+ .inc(flushedRows);
if (successfulRowsTag != null) {
for (int i = 0; i < context.protoRows.getSerializedRowsCount();
++i) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 9c6fae164fc..6b04ed0acc1 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -112,7 +112,7 @@ public class BigQuerySinkMetricsTest {
deletesDisabledCounter.inc();
MetricName deletesDisabledCounterName =
MetricName.named(
- "BigQuerySink",
"AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;");
+ "BigQuerySink",
"RowsAppendedCount-row_status:SUCCESSFUL;rpc_status:rpcStatus;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(deletesDisabledCounterName));
assertThat(
testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(),
@@ -127,7 +127,7 @@ public class BigQuerySinkMetricsTest {
MetricName deletesEnabledCounterName =
MetricName.named(
"BigQuerySink",
-
"AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;TableId:tableId;");
+
"RowsAppendedCount-row_status:SUCCESSFUL;rpc_status:rpcStatus;table_id:tableId;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(deletesEnabledCounterName));
assertThat(
testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(),
@@ -160,8 +160,9 @@ public class BigQuerySinkMetricsTest {
appendRowsThrottleCounter.inc(1);
assertThat(
appendRowsThrottleCounter.getName().getName(),
- equalTo("ThrottledTime-Method:APPEND_ROWS;"));
- MetricName counterName = MetricName.named("BigQuerySink",
"ThrottledTime-Method:APPEND_ROWS;");
+ equalTo("ThrottledTime-rpc_method:APPEND_ROWS;"));
+ MetricName counterName =
+ MetricName.named("BigQuerySink",
"ThrottledTime-rpc_method:APPEND_ROWS;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterName));
assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(),
equalTo(1L));
}
@@ -181,8 +182,9 @@ public class BigQuerySinkMetricsTest {
BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameDisabledDeletes =
- MetricName.named("BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;");
- MetricName histogramName = MetricName.named("BigQuerySink",
"RpcLatency-Method:APPEND_ROWS;");
+ MetricName.named("BigQuerySink",
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:OK;");
+ MetricName histogramName =
+ MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
assertThat(
@@ -199,7 +201,8 @@ public class BigQuerySinkMetricsTest {
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameEnabledDeletes =
MetricName.named(
- "BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;TableId:tableId;");
+ "BigQuerySink",
+
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:OK;table_id:tableId;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
assertThat(
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
@@ -228,8 +231,10 @@ public class BigQuerySinkMetricsTest {
BigQuerySinkMetrics.reportFailedRPCMetrics(
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameDisabledDeletes =
- MetricName.named("BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;");
- MetricName histogramName = MetricName.named("BigQuerySink",
"RpcLatency-Method:APPEND_ROWS;");
+ MetricName.named(
+ "BigQuerySink",
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;");
+ MetricName histogramName =
+ MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
assertThat(
@@ -249,7 +254,8 @@ public class BigQuerySinkMetricsTest {
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameEnabledDeletes =
MetricName.named(
- "BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;TableId:tableId;");
+ "BigQuerySink",
+
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;table_id:tableId;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
assertThat(
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
@@ -277,8 +283,10 @@ public class BigQuerySinkMetricsTest {
BigQuerySinkMetrics.reportFailedRPCMetrics(
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameDisabledDeletes =
- MetricName.named("BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;");
- MetricName histogramName = MetricName.named("BigQuerySink",
"RpcLatency-Method:APPEND_ROWS;");
+ MetricName.named(
+ "BigQuerySink",
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;");
+ MetricName histogramName =
+ MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameDisabledDeletes));
assertThat(
@@ -295,7 +303,8 @@ public class BigQuerySinkMetricsTest {
c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
MetricName counterNameEnabledDeletes =
MetricName.named(
- "BigQuerySink",
"RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;TableId:tableId;");
+ "BigQuerySink",
+
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;table_id:tableId;");
assertThat(testContainer.perWorkerCounters,
IsMapContaining.hasKey(counterNameEnabledDeletes));
assertThat(
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),