This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7159beac55f Minor updates to StreamingInsertsMetrics (#31003)
7159beac55f is described below

commit 7159beac55f6b3492bdf84df956cffe4df27fca9
Author: JayajP <[email protected]>
AuthorDate: Wed Apr 17 09:28:53 2024 -0700

    Minor updates to StreamingInsertsMetrics (#31003)
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 12 +++--
 .../io/gcp/bigquery/StreamingInsertsMetrics.java   | 63 ++++++----------------
 .../gcp/bigquery/StreamingInsertsMetricsTest.java  | 13 +++--
 3 files changed, 30 insertions(+), 58 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 2dbc02131f5..fa680004a7c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1079,6 +1079,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
       }
       StreamingInsertsMetrics streamingInsertsResults =
           BigQuerySinkMetrics.streamingInsertsMetrics();
+      int numFailedRows = 0;
       final Set<Integer> failedIndices = new HashSet<>();
       long retTotalDataSize = 0;
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new 
ArrayList<>();
@@ -1135,7 +1136,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                           + " pipeline, and the row will be output as a failed 
insert.",
                       nextRowSize));
             } else {
-              streamingInsertsResults.incrementFailedRows();
+              numFailedRows += 1;
               errorContainer.add(failedInserts, error, ref, 
rowsToPublish.get(rowIndex));
               failedIndices.add(rowIndex);
               rowIndex++;
@@ -1223,7 +1224,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                   retryIds.add(idsToPublish.get(errorIndex));
                 }
               } else {
-                streamingInsertsResults.incrementFailedRows();
+                numFailedRows += 1;
                 errorContainer.add(failedInserts, error, ref, 
rowsToPublish.get(errorIndex));
               }
             }
@@ -1234,7 +1235,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           Thread.currentThread().interrupt();
           throw new IOException("Interrupted while inserting " + 
rowsToPublish);
         } catch (ExecutionException e) {
-          streamingInsertsResults.updateStreamingInsertsMetrics(ref);
+          streamingInsertsResults.updateStreamingInsertsMetrics(
+              ref, rowList.size(), rowList.size());
           throw new RuntimeException(e.getCause());
         }
 
@@ -1276,8 +1278,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           }
         }
       }
-      streamingInsertsResults.updateSuccessfulAndFailedRows(rowList.size(), 
allErrors.size());
-      streamingInsertsResults.updateStreamingInsertsMetrics(ref);
+      numFailedRows += allErrors.size();
+      streamingInsertsResults.updateStreamingInsertsMetrics(ref, 
rowList.size(), numFailedRows);
       if (!allErrors.isEmpty()) {
         throw new IOException("Insert failed: " + allErrors);
       } else {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
index c451856f06a..2ada2e94dc1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
@@ -40,11 +40,8 @@ public interface StreamingInsertsMetrics {
 
   void updateSuccessfulRpcMetrics(Instant start, Instant end);
 
-  void incrementFailedRows();
-
-  void updateSuccessfulAndFailedRows(int totalRows, int failedRows);
-
-  void updateStreamingInsertsMetrics(@Nullable TableReference tableRef);
+  void updateStreamingInsertsMetrics(
+      @Nullable TableReference tableRef, int totalRows, int failedRows);
 
   /** No-op implementation of {@code StreamingInsertsResults}. */
   class NoOpStreamingInsertsMetrics implements StreamingInsertsMetrics {
@@ -60,13 +57,8 @@ public interface StreamingInsertsMetrics {
     public void updateSuccessfulRpcMetrics(Instant start, Instant end) {}
 
     @Override
-    public void incrementFailedRows() {}
-
-    @Override
-    public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {}
-
-    @Override
-    public void updateStreamingInsertsMetrics(@Nullable TableReference 
tableRef) {}
+    public void updateStreamingInsertsMetrics(
+        @Nullable TableReference tableRef, int totalRows, int failedRows) {}
 
     private static NoOpStreamingInsertsMetrics singleton = new 
NoOpStreamingInsertsMetrics();
 
@@ -87,6 +79,10 @@ public interface StreamingInsertsMetrics {
    */
   @AutoValue
   abstract class StreamingInsertsMetricsImpl implements 
StreamingInsertsMetrics {
+    static final Histogram LATENCY_HISTOGRAM =
+        BigQuerySinkMetrics.createRPCLatencyHistogram(
+            BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
+
     abstract ConcurrentLinkedQueue<java.time.Duration> rpcLatencies();
 
     abstract ConcurrentLinkedQueue<String> rpcErrorStatus();
@@ -96,10 +92,6 @@ public interface StreamingInsertsMetrics {
 
     abstract AtomicInteger successfulRpcsCount();
 
-    abstract AtomicInteger successfulRowsCount();
-
-    abstract AtomicInteger failedRowsCount();
-
     abstract AtomicBoolean isWritable();
 
     public static StreamingInsertsMetricsImpl create() {
@@ -108,8 +100,6 @@ public interface StreamingInsertsMetrics {
           new ConcurrentLinkedQueue<>(),
           new ConcurrentLinkedQueue<>(),
           new AtomicInteger(),
-          new AtomicInteger(),
-          new AtomicInteger(),
           new AtomicBoolean(true));
     }
 
@@ -139,31 +129,11 @@ public interface StreamingInsertsMetrics {
       }
     }
 
-    /** Increment the failed rows count by one. */
-    @Override
-    public void incrementFailedRows() {
-      if (isWritable().get()) {
-        failedRowsCount().getAndIncrement();
-      }
-    }
-
-    /** Increment the failed rows count, and set the successful rows count. */
-    @Override
-    public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
-      if (isWritable().get()) {
-        failedRowsCount().getAndAdd(failedRows);
-        successfulRowsCount().set(totalRows - failedRowsCount().get());
-      }
-    }
-
     /** Record rpc latency histogram metrics. */
     private void recordRpcLatencyMetrics() {
-      Histogram latencyHistogram =
-          BigQuerySinkMetrics.createRPCLatencyHistogram(
-              BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
-      double[] rpcLatencies =
-          rpcLatencies().stream().mapToDouble(duration -> 
duration.toMillis()).toArray();
-      latencyHistogram.update(rpcLatencies);
+      for (Duration d : rpcLatencies()) {
+        LATENCY_HISTOGRAM.update(d.toMillis());
+      }
     }
 
     /**
@@ -174,7 +144,8 @@ public interface StreamingInsertsMetrics {
      * @param tableRef BigQuery table that was written to, return early if 
null.
      */
     @Override
-    public void updateStreamingInsertsMetrics(@Nullable TableReference 
tableRef) {
+    public void updateStreamingInsertsMetrics(
+        @Nullable TableReference tableRef, int totalRows, int failedRows) {
       if (!isWritable().compareAndSet(true, false)) {
         // Metrics have already been exported.
         return;
@@ -221,16 +192,16 @@ public interface StreamingInsertsMetrics {
             .inc(successfulRpcsCount().longValue());
       }
 
-      if (failedRowsCount().get() != 0) {
+      if (failedRows >= 0) {
         BigQuerySinkMetrics.appendRowsRowStatusCounter(
                 BigQuerySinkMetrics.RowStatus.FAILED, 
BigQuerySinkMetrics.INTERNAL, shortTableId)
-            .inc(failedRowsCount().longValue());
+            .inc(failedRows);
       }
 
-      if (successfulRowsCount().get() != 0) {
+      if (totalRows - failedRows >= 0) {
         BigQuerySinkMetrics.appendRowsRowStatusCounter(
                 BigQuerySinkMetrics.RowStatus.SUCCESSFUL, 
BigQuerySinkMetrics.OK, shortTableId)
-            .inc(successfulRowsCount().longValue());
+            .inc(totalRows - failedRows);
       }
 
       recordRpcLatencyMetrics();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
index 274b6161099..4ba66814419 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -44,7 +44,7 @@ public class StreamingInsertsMetricsTest {
     results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
     TableReference ref = new 
TableReference().setTableId("t").setDatasetId("d");
 
-    results.updateStreamingInsertsMetrics(ref);
+    results.updateStreamingInsertsMetrics(ref, 5, 0);
 
     assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
     assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
@@ -62,7 +62,7 @@ public class StreamingInsertsMetricsTest {
     Instant t1 = Instant.now();
     results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
 
-    results.updateStreamingInsertsMetrics(null);
+    results.updateStreamingInsertsMetrics(null, 0, 0);
 
     assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
     assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
@@ -88,12 +88,11 @@ public class StreamingInsertsMetricsTest {
 
     StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
     results.updateRetriedRowsWithStatus("INTERNAL", 10);
-    results.updateSuccessfulAndFailedRows(50, 30);
     results.updateRetriedRowsWithStatus("QuotaLimits", 10);
     results.updateRetriedRowsWithStatus("QuotaLimits", 5);
     results.updateRetriedRowsWithStatus("ServiceUnavailable", 5);
 
-    results.updateStreamingInsertsMetrics(ref);
+    results.updateStreamingInsertsMetrics(ref, 50, 30);
 
     String tableId = "datasets/d/tables/t";
     MetricName internalErrorRetriedMetricName =
@@ -129,7 +128,7 @@ public class StreamingInsertsMetricsTest {
     results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(30)), 
"PermissionDenied");
     results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(40)), 
"Unavailable");
 
-    results.updateStreamingInsertsMetrics(ref);
+    results.updateStreamingInsertsMetrics(ref, 5, 0);
 
     // Validate RPC latency metric.
     MetricName histogramName =
@@ -162,7 +161,7 @@ public class StreamingInsertsMetricsTest {
     StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
     results.updateRetriedRowsWithStatus("INTERNAL", 10);
 
-    results.updateStreamingInsertsMetrics(ref);
+    results.updateStreamingInsertsMetrics(ref, 5, 0);
 
     String tableId = "datasets/d/tables/t";
     MetricName internalErrorRetriedMetricName =
@@ -172,7 +171,7 @@ public class StreamingInsertsMetricsTest {
 
     // Subsequent updates to this object should update the underyling metrics.
     results.updateRetriedRowsWithStatus("INTERNAL", 10);
-    results.updateStreamingInsertsMetrics(ref);
+    results.updateStreamingInsertsMetrics(ref, 5, 0);
 
     testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName, 
10L);
   }

Reply via email to