This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b06cd897d2 Fix BigQuerySinkMetrics constants and increment metrics in 
more places.  (#30067)
0b06cd897d2 is described below

commit 0b06cd897d2016e702452e16bc66018528896098
Author: JayajP <[email protected]>
AuthorDate: Tue Jan 23 12:51:09 2024 -0800

    Fix BigQuerySinkMetrics constants and increment metrics in more places.  
(#30067)
    
    * Fix bigquerysinkmetrics constants and increment metrics in a few more 
places
    
    * Spotless
---
 .../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java   | 12 ++++----
 .../bigquery/StorageApiWritesShardedRecords.java   |  5 ++--
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   | 35 ++++++++++++++--------
 3 files changed, 31 insertions(+), 21 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index a89707c1919..c0f470b3921 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -53,9 +53,9 @@ public class BigQuerySinkMetrics {
   public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";
 
   // Base Metric names
-  private static final String RPC_REQUESTS = "RpcRequests";
+  private static final String RPC_REQUESTS = "RpcRequestsCount";
   private static final String RPC_LATENCY = "RpcLatency";
-  private static final String APPEND_ROWS_ROW_STATUS = "AppendRowsRowStatus";
+  private static final String APPEND_ROWS_ROW_STATUS = "RowsAppendedCount";
   private static final String THROTTLED_TIME = "ThrottledTime";
 
   // StorageWriteAPI Method names
@@ -73,10 +73,10 @@ public class BigQuerySinkMetrics {
   }
 
   // Metric labels
-  private static final String TABLE_ID_LABEL = "TableId";
-  private static final String RPC_STATUS_LABEL = "RpcStatus";
-  private static final String RPC_METHOD = "Method";
-  private static final String ROW_STATUS = "RowStatus";
+  private static final String TABLE_ID_LABEL = "table_id";
+  private static final String RPC_STATUS_LABEL = "rpc_status";
+  private static final String RPC_METHOD = "rpc_method";
+  private static final String ROW_STATUS = "row_status";
 
   // Delimiters
   private static final char LABEL_DELIMITER = ';';
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 0f9b07d0c40..b612f199a29 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -652,7 +652,7 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
             AppendRowsContext failedContext =
                 
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
             BigQuerySinkMetrics.reportFailedRPCMetrics(
-                failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+                failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, 
shortTableId);
             String errorCode =
                 
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
 
@@ -801,7 +801,8 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
             BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
                 context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, 
shortTableId);
             BigQuerySinkMetrics.appendRowsRowStatusCounter(
-                BigQuerySinkMetrics.RowStatus.SUCCESSFUL, 
BigQuerySinkMetrics.OK, shortTableId);
+                    BigQuerySinkMetrics.RowStatus.SUCCESSFUL, 
BigQuerySinkMetrics.OK, shortTableId)
+                .inc(flushedRows);
 
             if (successfulRowsTag != null) {
               for (int i = 0; i < context.protoRows.getSerializedRowsCount(); 
++i) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 9c6fae164fc..6b04ed0acc1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -112,7 +112,7 @@ public class BigQuerySinkMetricsTest {
     deletesDisabledCounter.inc();
     MetricName deletesDisabledCounterName =
         MetricName.named(
-            "BigQuerySink", 
"AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;");
+            "BigQuerySink", 
"RowsAppendedCount-row_status:SUCCESSFUL;rpc_status:rpcStatus;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(deletesDisabledCounterName));
     assertThat(
         
testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(),
@@ -127,7 +127,7 @@ public class BigQuerySinkMetricsTest {
     MetricName deletesEnabledCounterName =
         MetricName.named(
             "BigQuerySink",
-            
"AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;TableId:tableId;");
+            
"RowsAppendedCount-row_status:SUCCESSFUL;rpc_status:rpcStatus;table_id:tableId;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(deletesEnabledCounterName));
     assertThat(
         
testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(),
@@ -160,8 +160,9 @@ public class BigQuerySinkMetricsTest {
     appendRowsThrottleCounter.inc(1);
     assertThat(
         appendRowsThrottleCounter.getName().getName(),
-        equalTo("ThrottledTime-Method:APPEND_ROWS;"));
-    MetricName counterName = MetricName.named("BigQuerySink", 
"ThrottledTime-Method:APPEND_ROWS;");
+        equalTo("ThrottledTime-rpc_method:APPEND_ROWS;"));
+    MetricName counterName =
+        MetricName.named("BigQuerySink", 
"ThrottledTime-rpc_method:APPEND_ROWS;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterName));
     
assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(), 
equalTo(1L));
   }
@@ -181,8 +182,9 @@ public class BigQuerySinkMetricsTest {
     BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameDisabledDeletes =
-        MetricName.named("BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;");
-    MetricName histogramName = MetricName.named("BigQuerySink", 
"RpcLatency-Method:APPEND_ROWS;");
+        MetricName.named("BigQuerySink", 
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:OK;");
+    MetricName histogramName =
+        MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
     HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
     assertThat(
@@ -199,7 +201,8 @@ public class BigQuerySinkMetricsTest {
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameEnabledDeletes =
         MetricName.named(
-            "BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;TableId:tableId;");
+            "BigQuerySink",
+            
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:OK;table_id:tableId;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
     assertThat(
         
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
@@ -228,8 +231,10 @@ public class BigQuerySinkMetricsTest {
     BigQuerySinkMetrics.reportFailedRPCMetrics(
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameDisabledDeletes =
-        MetricName.named("BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;");
-    MetricName histogramName = MetricName.named("BigQuerySink", 
"RpcLatency-Method:APPEND_ROWS;");
+        MetricName.named(
+            "BigQuerySink", 
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;");
+    MetricName histogramName =
+        MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
     HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
     assertThat(
@@ -249,7 +254,8 @@ public class BigQuerySinkMetricsTest {
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameEnabledDeletes =
         MetricName.named(
-            "BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;TableId:tableId;");
+            "BigQuerySink",
+            
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;table_id:tableId;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
     assertThat(
         
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
@@ -277,8 +283,10 @@ public class BigQuerySinkMetricsTest {
     BigQuerySinkMetrics.reportFailedRPCMetrics(
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameDisabledDeletes =
-        MetricName.named("BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;");
-    MetricName histogramName = MetricName.named("BigQuerySink", 
"RpcLatency-Method:APPEND_ROWS;");
+        MetricName.named(
+            "BigQuerySink", 
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;");
+    MetricName histogramName =
+        MetricName.named("BigQuerySink", "RpcLatency-rpc_method:APPEND_ROWS;");
     HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameDisabledDeletes));
     assertThat(
@@ -295,7 +303,8 @@ public class BigQuerySinkMetricsTest {
         c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
     MetricName counterNameEnabledDeletes =
         MetricName.named(
-            "BigQuerySink", 
"RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;TableId:tableId;");
+            "BigQuerySink",
+            
"RpcRequestsCount-rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;table_id:tableId;");
     assertThat(testContainer.perWorkerCounters, 
IsMapContaining.hasKey(counterNameEnabledDeletes));
     assertThat(
         
testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),

Reply via email to