This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e7a6ab750 Update BigQuerySinkMetrics for StreamingInserts. (#30320)
32e7a6ab750 is described below

commit 32e7a6ab750a11c589f92393def98d7af9555051
Author: JayajP <[email protected]>
AuthorDate: Thu Mar 21 14:35:57 2024 -0700

    Update BigQuerySinkMetrics for StreamingInserts. (#30320)
    
    * Update BigQuerySinkMetrics for StreamingInserts.
    
    * Spotless/add additional comments
    
    * Address comments
    
    * Address comments and some minor optimizations
    
    Revert chagnes to histogram bucket widths
    
    * Add functionality to completely disable new streaming inserts metrics
    
    * Remove unnecessary qualifiers in StreamingInsertsMetrics
    
    * fix unit tests
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  21 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  30 ++-
 .../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java   |  30 ++-
 .../io/gcp/bigquery/StreamingInsertsMetrics.java   | 234 +++++++++++++++++++++
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  | 163 ++++++++++++++
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   |  34 +++
 .../gcp/bigquery/StreamingInsertsMetricsTest.java  | 179 ++++++++++++++++
 7 files changed, 674 insertions(+), 17 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 4c3ffd08a0b..c3e820767cd 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -579,13 +579,10 @@ public class StreamingDataflowWorker {
     // metrics.
     MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
 
-    // When enabled, the Pipeline will record Per-Worker metrics that will be 
piped to DFE.
-    StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
-        options.isEnableStreamingEngine()
-            && DataflowRunner.hasExperiment(options, 
"enable_per_worker_metrics"));
-    // StreamingStepMetricsContainer automatically deletes perWorkerCounters 
if they are zero-valued
-    // for longer than 5 minutes.
-    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+    if (options.isEnableStreamingEngine()
+        && DataflowRunner.hasExperiment(options, "enable_per_worker_metrics")) 
{
+      enableBigQueryMetrics();
+    }
 
     JvmInitializers.runBeforeProcessing(options);
     worker.startStatusPages();
@@ -672,6 +669,16 @@ public class StreamingDataflowWorker {
     return maxMem > 0 ? maxMem : (Runtime.getRuntime().maxMemory() / 2);
   }
 
+  private static void enableBigQueryMetrics() {
+    // When enabled, the Pipeline will record Per-Worker metrics that will be 
piped to DFE.
+    StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
+    // StreamingStepMetricsContainer automatically deletes perWorkerCounters 
if they are zero-valued
+    // for longer than 5 minutes.
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+    // Support metrics for BigQuery's Streaming Inserts write method.
+    BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+  }
+
   void addStateNameMappings(Map<String, String> nameMap) {
     stateNameMap.putAll(nameMap);
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index bb3b99f6fcd..2dbc02131f5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -95,6 +95,7 @@ import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.protobuf.ProtoUtils;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -937,6 +938,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
       private final List<TableDataInsertAllRequest.Rows> rows;
       private final AtomicLong maxThrottlingMsec;
       private final Sleeper sleeper;
+      private final StreamingInsertsMetrics result;
 
       InsertBatchofRowsCallable(
           TableReference ref,
@@ -946,7 +948,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           FluentBackoff rateLimitBackoffFactory,
           List<TableDataInsertAllRequest.Rows> rows,
           AtomicLong maxThrottlingMsec,
-          Sleeper sleeper) {
+          Sleeper sleeper,
+          StreamingInsertsMetrics result) {
         this.ref = ref;
         this.skipInvalidRows = skipInvalidRows;
         this.ignoreUnkownValues = ignoreUnknownValues;
@@ -955,6 +958,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
         this.rows = rows;
         this.maxThrottlingMsec = maxThrottlingMsec;
         this.sleeper = sleeper;
+        this.result = result;
       }
 
       @Override
@@ -975,6 +979,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
         long totalBackoffMillis = 0L;
         while (true) {
           ServiceCallMetric serviceCallMetric = 
BigQueryUtils.writeCallMetric(ref);
+          Instant start = Instant.now();
           try {
             List<TableDataInsertAllResponse.InsertErrors> response =
                 insert.execute().getInsertErrors();
@@ -987,14 +992,18 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                 }
               }
             }
+            result.updateSuccessfulRpcMetrics(start, Instant.now());
             return response;
           } catch (IOException e) {
             GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
             if (errorInfo == null) {
               
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+              result.updateFailedRpcMetrics(start, start, 
BigQuerySinkMetrics.UNKNOWN);
               throw e;
             }
-            serviceCallMetric.call(errorInfo.getReason());
+            String errorReason = errorInfo.getReason();
+            serviceCallMetric.call(errorReason);
+            result.updateFailedRpcMetrics(start, Instant.now(), errorReason);
             /**
              * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be 
replaced by
              * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next 
release of
@@ -1031,6 +1040,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
               totalBackoffMillis += nextBackOffMillis;
               final long totalBackoffMillisSoFar = totalBackoffMillis;
               maxThrottlingMsec.getAndUpdate(current -> Math.max(current, 
totalBackoffMillisSoFar));
+              result.updateRetriedRowsWithStatus(errorReason, rows.size());
             } catch (InterruptedException interrupted) {
               throw new IOException("Interrupted while waiting before retrying 
insertAll");
             }
@@ -1067,7 +1077,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
             "If insertIdList is not null it needs to have at least "
                 + "as many elements as rowList");
       }
-
+      StreamingInsertsMetrics streamingInsertsResults =
+          BigQuerySinkMetrics.streamingInsertsMetrics();
       final Set<Integer> failedIndices = new HashSet<>();
       long retTotalDataSize = 0;
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new 
ArrayList<>();
@@ -1124,6 +1135,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                           + " pipeline, and the row will be output as a failed 
insert.",
                       nextRowSize));
             } else {
+              streamingInsertsResults.incrementFailedRows();
               errorContainer.add(failedInserts, error, ref, 
rowsToPublish.get(rowIndex));
               failedIndices.add(rowIndex);
               rowIndex++;
@@ -1150,7 +1162,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                         rateLimitBackoffFactory,
                         rows,
                         maxThrottlingMsec,
-                        sleeper)));
+                        sleeper,
+                        streamingInsertsResults)));
             strideIndices.add(strideIndex);
             retTotalDataSize += dataSize;
             strideIndex = rowIndex;
@@ -1180,7 +1193,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                       rateLimitBackoffFactory,
                       rows,
                       maxThrottlingMsec,
-                      sleeper)));
+                      sleeper,
+                      streamingInsertsResults)));
           strideIndices.add(strideIndex);
           retTotalDataSize += dataSize;
           rows = new ArrayList<>();
@@ -1209,6 +1223,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                   retryIds.add(idsToPublish.get(errorIndex));
                 }
               } else {
+                streamingInsertsResults.incrementFailedRows();
                 errorContainer.add(failedInserts, error, ref, 
rowsToPublish.get(errorIndex));
               }
             }
@@ -1219,6 +1234,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           Thread.currentThread().interrupt();
           throw new IOException("Interrupted while inserting " + 
rowsToPublish);
         } catch (ExecutionException e) {
+          streamingInsertsResults.updateStreamingInsertsMetrics(ref);
           throw new RuntimeException(e.getCause());
         }
 
@@ -1237,6 +1253,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
         }
         rowsToPublish = retryRows;
         idsToPublish = retryIds;
+        streamingInsertsResults.updateRetriedRowsWithStatus(
+            BigQuerySinkMetrics.INTERNAL, retryRows.size());
         // print first 5 failures
         int numErrorToLog = Math.min(allErrors.size(), 5);
         LOG.info(
@@ -1258,6 +1276,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           }
         }
       }
+      streamingInsertsResults.updateSuccessfulAndFailedRows(rowList.size(), 
allErrors.size());
+      streamingInsertsResults.updateStreamingInsertsMetrics(ref);
       if (!allErrors.isEmpty()) {
         throw new IOException("Insert failed: " + allErrors);
       } else {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index 0375cf9ab33..392a1d16404 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.metrics.DelegatingHistogram;
 import org.apache.beam.sdk.metrics.Histogram;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.util.HistogramData;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 
@@ -45,12 +46,14 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  */
 public class BigQuerySinkMetrics {
   private static boolean supportMetricsDeletion = false;
+  private static boolean supportStreamingInsertsMetrics = false;
 
   public static final String METRICS_NAMESPACE = "BigQuerySink";
 
   // Status codes
-  private static final String UNKNOWN = Status.Code.UNKNOWN.toString();
+  public static final String UNKNOWN = Status.Code.UNKNOWN.toString();
   public static final String OK = Status.Code.OK.toString();
+  static final String INTERNAL = "INTERNAL";
   public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";
 
   // Base Metric names
@@ -59,8 +62,9 @@ public class BigQuerySinkMetrics {
   private static final String APPEND_ROWS_ROW_STATUS = "RowsAppendedCount";
   public static final String THROTTLED_TIME = "ThrottledTime";
 
-  // StorageWriteAPI Method names
+  // BigQuery Write Method names
   public enum RpcMethod {
+    STREAMING_INSERTS,
     APPEND_ROWS,
     FLUSH_ROWS,
     FINALIZE_STREAM
@@ -167,8 +171,8 @@ public class BigQuerySinkMetrics {
    *     'RpcRequests-Method:{method}RpcStatus:{status};TableId:{tableId}' 
TableId label is dropped
    *     if 'supportsMetricsDeletion' is not enabled.
    */
-  private static Counter createRPCRequestCounter(
-      RpcMethod method, String rpcStatus, String tableId) {
+  @VisibleForTesting
+  static Counter createRPCRequestCounter(RpcMethod method, String rpcStatus, 
String tableId) {
     NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
     metricLabels.put(RPC_STATUS_LABEL, rpcStatus);
     metricLabels.put(RPC_METHOD, method.toString());
@@ -189,7 +193,7 @@ public class BigQuerySinkMetrics {
    * @param method StorageWriteAPI method associated with this metric.
    * @return Histogram with exponential buckets with a sqrt(2) growth factor.
    */
-  private static Histogram createRPCLatencyHistogram(RpcMethod method) {
+  static Histogram createRPCLatencyHistogram(RpcMethod method) {
     NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
     metricLabels.put(RPC_METHOD, method.toString());
     String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels);
@@ -326,6 +330,22 @@ public class BigQuerySinkMetrics {
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Returns a container to store metrics for BigQuery's {@code Streaming 
Inserts} RPC. If these
+   * metrics are disabled, then we return a no-op container.
+   */
+  static StreamingInsertsMetrics streamingInsertsMetrics() {
+    if (supportStreamingInsertsMetrics) {
+      return StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+    } else {
+      return StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
+    }
+  }
+
+  public static void setSupportStreamingInsertsMetrics(boolean 
supportStreamingInsertsMetrics) {
+    BigQuerySinkMetrics.supportStreamingInsertsMetrics = 
supportStreamingInsertsMetrics;
+  }
+
   public static void setSupportMetricsDeletion(boolean supportMetricsDeletion) 
{
     BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
new file mode 100644
index 00000000000..998c9af945b
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Histogram;
+import org.apache.beam.sdk.values.KV;
+
+/** Stores and exports metrics for a batch of Streaming Inserts RPCs. */
+public interface StreamingInsertsMetrics {
+
+  void updateRetriedRowsWithStatus(String status, int retriedRows);
+
+  void updateFailedRpcMetrics(Instant start, Instant end, String status);
+
+  void updateSuccessfulRpcMetrics(Instant start, Instant end);
+
+  void incrementFailedRows();
+
+  void updateSuccessfulAndFailedRows(int totalRows, int failedRows);
+
+  void updateStreamingInsertsMetrics(@Nullable TableReference tableRef);
+
+  /** No-op implementation of {@code StreamingInsertsResults}. */
+  class NoOpStreamingInsertsMetrics implements StreamingInsertsMetrics {
+    private NoOpStreamingInsertsMetrics() {}
+
+    @Override
+    public void updateRetriedRowsWithStatus(String status, int retriedRows) {}
+
+    @Override
+    public void updateFailedRpcMetrics(Instant start, Instant end, String 
status) {}
+
+    @Override
+    public void updateSuccessfulRpcMetrics(Instant start, Instant end) {}
+
+    @Override
+    public void incrementFailedRows() {}
+
+    @Override
+    public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {}
+
+    @Override
+    public void updateStreamingInsertsMetrics(@Nullable TableReference 
tableRef) {}
+
+    private static NoOpStreamingInsertsMetrics singleton = new 
NoOpStreamingInsertsMetrics();
+
+    static NoOpStreamingInsertsMetrics getInstance() {
+      return singleton;
+    }
+  }
+
+  /**
+   * Metrics of a batch of InsertAll RPCs. Member variables are thread safe; 
however, this class
+   * does not have atomicity across member variables.
+   *
+   * <p>Expected usage: A number of threads record metrics in an instance of 
this class with the
+   * member methods. Afterwards, a single thread should call {@code 
updateStreamingInsertsMetrics}
+   * which will export all counters metrics and RPC latency distribution 
metrics to the underlying
+   * {@code perWorkerMetrics} container. Afterwards, metrics should not be 
written/read from this
+   * object.
+   */
+  @AutoValue
+  abstract class StreamingInsertsMetricsImpl implements 
StreamingInsertsMetrics {
+    abstract ConcurrentLinkedQueue<java.time.Duration> rpcLatencies();
+
+    abstract ConcurrentLinkedQueue<String> rpcErrorStatus();
+    // Represents <Rpc Status, Number of Rows> for rows that are retried 
because of a failed
+    // InsertAll RPC.
+    abstract ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus();
+
+    abstract AtomicInteger successfulRpcsCount();
+
+    abstract AtomicInteger successfulRowsCount();
+
+    abstract AtomicInteger failedRowsCount();
+
+    abstract AtomicBoolean isWritable();
+
+    public static StreamingInsertsMetricsImpl create() {
+      return new AutoValue_StreamingInsertsMetrics_StreamingInsertsMetricsImpl(
+          new ConcurrentLinkedQueue<>(),
+          new ConcurrentLinkedQueue<>(),
+          new ConcurrentLinkedQueue<>(),
+          new AtomicInteger(),
+          new AtomicInteger(),
+          new AtomicInteger(),
+          new AtomicBoolean(true));
+    }
+
+    /** Update metrics for rows that were retried due to an RPC error. */
+    @Override
+    public void updateRetriedRowsWithStatus(String status, int retriedRows) {
+      if (isWritable().get()) {
+        retriedRowsByStatus().add(KV.of(status, retriedRows));
+      }
+    }
+
+    /** Record the rpc status and latency of a failed StreamingInserts RPC 
call. */
+    @Override
+    public void updateFailedRpcMetrics(Instant start, Instant end, String 
status) {
+      if (isWritable().get()) {
+        rpcErrorStatus().add(status);
+        rpcLatencies().add(Duration.between(start, end));
+      }
+    }
+
+    /** Record the rpc status and latency of a successful StreamingInserts RPC 
call. */
+    @Override
+    public void updateSuccessfulRpcMetrics(Instant start, Instant end) {
+      if (isWritable().get()) {
+        successfulRpcsCount().getAndIncrement();
+        rpcLatencies().add(Duration.between(start, end));
+      }
+    }
+
+    /** Increment the failed rows count by one. */
+    @Override
+    public void incrementFailedRows() {
+      if (isWritable().get()) {
+        failedRowsCount().getAndIncrement();
+      }
+    }
+
+    /** Increment the failed rows count, and set the successful rows count. */
+    @Override
+    public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
+      if (isWritable().get()) {
+        failedRowsCount().getAndAdd(failedRows);
+        successfulRowsCount().set(totalRows - failedRowsCount().get());
+      }
+    }
+
+    /**
+     * Export all metrics recorded in this instance to the underlying {@code 
perWorkerMetrics}
+     * containers. This function will only report metrics once per instance. 
Subsequent calls to
+     * this function will no-op.
+     *
+     * @param tableRef BigQuery table that was written to, return early if 
null.
+     */
+    @Override
+    public void updateStreamingInsertsMetrics(@Nullable TableReference 
tableRef) {
+      if (!isWritable().compareAndSet(true, false)) {
+        // Metrics have already been exported.
+        return;
+      }
+
+      if (tableRef == null) {
+        return;
+      }
+
+      String shortTableId =
+          String.join("/", "datasets", tableRef.getDatasetId(), "tables", 
tableRef.getTableId());
+
+      Map<String, Integer> rpcRequetsRpcStatusMap = new HashMap<>();
+      Map<String, Integer> retriedRowsRpcStatusMap = new HashMap<>();
+
+      for (String status : rpcErrorStatus()) {
+        Integer currentVal = rpcRequetsRpcStatusMap.getOrDefault(status, 0);
+        rpcRequetsRpcStatusMap.put(status, currentVal + 1);
+      }
+
+      for (KV<String, Integer> retryCountByStatus : retriedRowsByStatus()) {
+        Integer currentVal = 
retriedRowsRpcStatusMap.getOrDefault(retryCountByStatus.getKey(), 0);
+        retriedRowsRpcStatusMap.put(
+            retryCountByStatus.getKey(), currentVal + 
retryCountByStatus.getValue());
+      }
+
+      for (Entry<String, Integer> entry : rpcRequetsRpcStatusMap.entrySet()) {
+        BigQuerySinkMetrics.createRPCRequestCounter(
+                BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS, 
entry.getKey(), shortTableId)
+            .inc(entry.getValue());
+      }
+
+      for (Entry<String, Integer> entry : retriedRowsRpcStatusMap.entrySet()) {
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                BigQuerySinkMetrics.RowStatus.RETRIED, entry.getKey(), 
shortTableId)
+            .inc(entry.getValue());
+      }
+
+      if (successfulRpcsCount().get() != 0) {
+        BigQuerySinkMetrics.createRPCRequestCounter(
+                BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS,
+                BigQuerySinkMetrics.OK,
+                shortTableId)
+            .inc(successfulRpcsCount().longValue());
+      }
+
+      if (failedRowsCount().get() != 0) {
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                BigQuerySinkMetrics.RowStatus.FAILED, 
BigQuerySinkMetrics.INTERNAL, shortTableId)
+            .inc(failedRowsCount().longValue());
+      }
+
+      if (successfulRowsCount().get() != 0) {
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                BigQuerySinkMetrics.RowStatus.SUCCESSFUL, 
BigQuerySinkMetrics.OK, shortTableId)
+            .inc(successfulRowsCount().longValue());
+      }
+
+      Histogram latencyHistogram =
+          BigQuerySinkMetrics.createRPCLatencyHistogram(
+              BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
+      for (Duration latency : rpcLatencies()) {
+        latencyHistogram.update(latency.toMillis());
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index a8e1ad52237..eb4cff1a596 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
@@ -97,6 +98,7 @@ import 
org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics.RowStatus;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -832,6 +834,167 @@ public class BigQueryServicesImplTest {
     verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1);
   }
 
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} sets perWorkerMetrics 
properly when the Backoff
+   * limit is hit.
+   */
+  @Test
+  public void testInsertAll_InternalBigQueryErrors() throws Exception {
+    BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+    BigQuerySinkMetricsTest.TestMetricsContainer testMetricsContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testMetricsContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+        ImmutableList.of(
+            wrapValue(new TableRow().set("row", "a")), wrapValue(new 
TableRow().set("row", "b")));
+    List<String> insertIds = ImmutableList.of("a", "b");
+
+    final TableDataInsertAllResponse aFailed =
+        new TableDataInsertAllResponse()
+            .setInsertErrors(
+                ImmutableList.of(
+                    new 
InsertErrors().setIndex(0L).setErrors(ImmutableList.of(new ErrorProto()))));
+    MockSetupFunction aFailedResponse =
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(aFailed));
+        };
+
+    // Expect 4 InsertAll RPCS:
+    // 1st request will return an RPC error that should be retried
+    // 2nd-4th RPCs will fail to insert Row A.
+    // Expect client throw an exception after 4th RPC due to hitting backoff 
limits.
+    setupMockResponses(
+        response -> {
+          when(response.getStatusCode()).thenReturn(403);
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getContent())
+              
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)));
+        },
+        aFailedResponse,
+        aFailedResponse,
+        aFailedResponse,
+        aFailedResponse);
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    try {
+      dataService.insertAll(
+          ref,
+          rows,
+          insertIds,
+          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+          TEST_BACKOFF,
+          new MockSleeper(),
+          InsertRetryPolicy.alwaysRetry(),
+          null,
+          null,
+          false,
+          false,
+          false,
+          null);
+      fail();
+    } catch (Exception e) {
+    }
+    verifyAllResponsesAreRead();
+
+    String tableId = "datasets/dataset/tables/table";
+    BigQuerySinkMetrics.RpcMethod method = 
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+    MetricName okRpcRequestName =
+        BigQuerySinkMetrics.createRPCRequestCounter(method, "OK", 
tableId).getName();
+    MetricName rateLimitRpcRequestName =
+        BigQuerySinkMetrics.createRPCRequestCounter(method, 
"rateLimitExceeded", tableId).getName();
+
+    MetricName okRowsAppendedName =
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.SUCCESSFUL, 
"OK", tableId)
+            .getName();
+    MetricName internalRowsAppendedName =
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.RETRIED, 
"INTERNAL", tableId)
+            .getName();
+    MetricName rateLimitRowsAppendedName =
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                RowStatus.RETRIED, "rateLimitExceeded", tableId)
+            .getName();
+    MetricName failedRowsName =
+        BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.FAILED, 
"INTERNAL", tableId)
+            .getName();
+
+    testMetricsContainer.assertPerWorkerCounterValue(okRpcRequestName, 4L);
+    testMetricsContainer.assertPerWorkerCounterValue(rateLimitRpcRequestName, 
1L);
+    testMetricsContainer.assertPerWorkerCounterValue(okRowsAppendedName, 1L);
+    testMetricsContainer.assertPerWorkerCounterValue(internalRowsAppendedName, 
3L);
+    
testMetricsContainer.assertPerWorkerCounterValue(rateLimitRowsAppendedName, 2L);
+    testMetricsContainer.assertPerWorkerCounterValue(failedRowsName, 1L);
+
+    assertThat(testMetricsContainer.perWorkerHistograms.size(), equalTo(1));
+  }
+
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} sets perWorkerMetrics 
properly when BigQuery
+   * returns unretryable Rpc Error.
+   */
+  @Test
+  public void testInsertAll_RpcErrors() throws Exception {
+    BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+    BigQuerySinkMetricsTest.TestMetricsContainer testMetricsContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testMetricsContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+        ImmutableList.of(
+            wrapValue(new TableRow().set("row", "a")), wrapValue(new 
TableRow().set("row", "b")));
+    List<String> insertIds = ImmutableList.of("a", "b");
+
+    // Expect 1 RPC request
+    // 1st request will return an RPC error that is not retryable.
+    // Expect client to record metrics and rethrow the error.
+    setupMockResponses(
+        response -> {
+          when(response.getStatusCode()).thenReturn(403);
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getContent())
+              
.thenReturn(toStream(errorWithReasonAndStatus("actuallyForbidden", 403)));
+        });
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    try {
+      dataService.insertAll(
+          ref,
+          rows,
+          insertIds,
+          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+          TEST_BACKOFF,
+          new MockSleeper(),
+          InsertRetryPolicy.alwaysRetry(),
+          null,
+          null,
+          false,
+          false,
+          false,
+          null);
+      fail();
+    } catch (Exception e) {
+    }
+    verifyAllResponsesAreRead();
+
+    String tableId = "datasets/dataset/tables/table";
+    BigQuerySinkMetrics.RpcMethod method = 
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+    MetricName forbiddenRpcName =
+        BigQuerySinkMetrics.createRPCRequestCounter(method, 
"actuallyForbidden", tableId).getName();
+
+    testMetricsContainer.assertPerWorkerCounterValue(forbiddenRpcName, 1L);
+    assertThat(testMetricsContainer.perWorkerHistograms.size(), equalTo(1));
+  }
+
   // A BackOff that makes a total of 4 attempts
   private static final FluentBackoff TEST_BACKOFF =
       FluentBackoff.DEFAULT
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 8b960a8c6c6..861378a41f8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
 
 import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1.Exceptions;
 import io.grpc.Status;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -96,6 +99,21 @@ public class BigQuerySinkMetricsTest {
       perWorkerHistograms.clear();
       perWorkerCounters.clear();
     }
+
+    public void assertPerWorkerCounterValue(MetricName name, long value) 
throws Exception {
+      assertThat(perWorkerCounters, IsMapContaining.hasKey(name));
+      assertThat(perWorkerCounters.get(name).getCumulative(), equalTo(value));
+    }
+
+    public void assertPerWorkerHistogramValues(
+        MetricName name, HistogramData.BucketType bucketType, double... 
values) {
+      KV<MetricName, HistogramData.BucketType> kv = KV.of(name, bucketType);
+      assertThat(perWorkerHistograms, IsMapContaining.hasKey(kv));
+
+      Double[] objValues = 
Arrays.stream(values).boxed().toArray(Double[]::new);
+
+      assertThat(perWorkerHistograms.get(kv).values, 
containsInAnyOrder(objValues));
+    }
   }
 
   @Test
@@ -359,4 +377,20 @@ public class BigQuerySinkMetricsTest {
   public void testParseMetricName_emptyString() {
     assertThat(BigQuerySinkMetrics.parseMetricName("").isPresent(), 
equalTo(false));
   }
+
+  @Test
+  public void testStreamingInsertsMetrics_disabled() {
+    BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(false);
+    assertThat(
+        BigQuerySinkMetrics.streamingInsertsMetrics(),
+        
sameInstance(StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance()));
+  }
+
+  @Test
+  public void testStreamingInsertsMetrics_enabled() {
+    BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+    assertThat(
+        BigQuerySinkMetrics.streamingInsertsMetrics(),
+        instanceOf(StreamingInsertsMetrics.StreamingInsertsMetricsImpl.class));
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
new file mode 100644
index 00000000000..e04870c531f
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import com.google.api.services.bigquery.model.TableReference;
+import java.time.Duration;
+import java.time.Instant;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics.RowStatus;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.HistogramData;
+import org.junit.Test;
+
+public class StreamingInsertsMetricsTest {
+
+  @Test
+  public void testNoOpStreamingInsertsMetrics() throws Exception {
+    BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testContainer);
+
+    StreamingInsertsMetrics results =
+        StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
+    results.updateRetriedRowsWithStatus("INTERNAL", 0);
+    Instant t1 = Instant.now();
+    results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+    TableReference ref = new 
TableReference().setTableId("t").setDatasetId("d");
+
+    results.updateStreamingInsertsMetrics(ref);
+
+    assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
+    assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
+  }
+
+  @Test
+  public void testUpdateStreamingInsertsMetrics_nullInput() throws Exception {
+    BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+    StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+    results.updateRetriedRowsWithStatus("INTERNAL", 0);
+    Instant t1 = Instant.now();
+    results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+
+    results.updateStreamingInsertsMetrics(null);
+
+    assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
+    assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
+  }
+
+  MetricName getAppendRowsCounterName(
+      BigQuerySinkMetrics.RowStatus rowStatus, String rpcStatus, String 
tableId) {
+    return BigQuerySinkMetrics.appendRowsRowStatusCounter(rowStatus, 
rpcStatus, tableId).getName();
+  }
+
+  MetricName getRpcRequestsCounterName(
+      BigQuerySinkMetrics.RpcMethod method, String rpcStatus, String tableId) {
+    return BigQuerySinkMetrics.createRPCRequestCounter(method, rpcStatus, 
tableId).getName();
+  }
+
+  @Test
+  public void testUpdateStreamingInsertsMetrics_rowsAppendedCounter() throws 
Exception {
+    BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+    TableReference ref = new 
TableReference().setTableId("t").setDatasetId("d");
+
+    StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+    results.updateRetriedRowsWithStatus("INTERNAL", 10);
+    results.updateSuccessfulAndFailedRows(50, 30);
+    results.updateRetriedRowsWithStatus("QuotaLimits", 10);
+    results.updateRetriedRowsWithStatus("QuotaLimits", 5);
+    results.updateRetriedRowsWithStatus("ServiceUnavailable", 5);
+
+    results.updateStreamingInsertsMetrics(ref);
+
+    String tableId = "datasets/d/tables/t";
+    MetricName internalErrorRetriedMetricName =
+        getAppendRowsCounterName(RowStatus.RETRIED, "INTERNAL", tableId);
+    MetricName succssfulRowsMetricName =
+        getAppendRowsCounterName(RowStatus.SUCCESSFUL, "OK", tableId);
+    MetricName failedRowsMetricName =
+        getAppendRowsCounterName(RowStatus.FAILED, "INTERNAL", tableId);
+    MetricName retriedRowsQuotaMetricName =
+        getAppendRowsCounterName(RowStatus.RETRIED, "QuotaLimits", tableId);
+    MetricName retriedRowsUnavailableMetricName =
+        getAppendRowsCounterName(RowStatus.RETRIED, "ServiceUnavailable", 
tableId);
+
+    testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName, 
10L);
+    testContainer.assertPerWorkerCounterValue(succssfulRowsMetricName, 20L);
+    testContainer.assertPerWorkerCounterValue(failedRowsMetricName, 30L);
+    testContainer.assertPerWorkerCounterValue(retriedRowsQuotaMetricName, 15L);
+    
testContainer.assertPerWorkerCounterValue(retriedRowsUnavailableMetricName, 5L);
+  }
+
+  @Test
+  public void testUpdateStreamingInsertsMetrics_rpcLatencyHistogram() throws 
Exception {
+    BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+    TableReference ref = new 
TableReference().setTableId("t").setDatasetId("d");
+
+    StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+    Instant t1 = Instant.now();
+    results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+    results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(20)));
+    results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(30)), 
"PermissionDenied");
+    results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(40)), 
"Unavailable");
+
+    results.updateStreamingInsertsMetrics(ref);
+
+    // Validate RPC latency metric.
+    MetricName histogramName =
+        MetricName.named("BigQuerySink", 
"RpcLatency*rpc_method:STREAMING_INSERTS;");
+    HistogramData.BucketType bucketType = 
HistogramData.ExponentialBuckets.of(1, 34);
+    testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 
10.0, 20.0, 30.0, 40.0);
+
+    // Validate RPC Status metric.
+    BigQuerySinkMetrics.RpcMethod m = 
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+    String tableId = "datasets/d/tables/t";
+    MetricName okMetricName = getRpcRequestsCounterName(m, "OK", tableId);
+    MetricName permissionDeniedMetricName =
+        getRpcRequestsCounterName(m, "PermissionDenied", tableId);
+    MetricName unavailableMetricName = getRpcRequestsCounterName(m, 
"Unavailable", tableId);
+
+    testContainer.assertPerWorkerCounterValue(okMetricName, 2L);
+    testContainer.assertPerWorkerCounterValue(permissionDeniedMetricName, 1L);
+    testContainer.assertPerWorkerCounterValue(unavailableMetricName, 1L);
+  }
+
+  @Test
+  public void 
testUpdateStreamingInsertsMetrics_multipleUpdateStreamingInsertsMetrics()
+      throws Exception {
+    BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+        new BigQuerySinkMetricsTest.TestMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(testContainer);
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+    TableReference ref = new 
TableReference().setTableId("t").setDatasetId("d");
+
+    StreamingInsertsMetrics results = 
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+    results.updateRetriedRowsWithStatus("INTERNAL", 10);
+
+    results.updateStreamingInsertsMetrics(ref);
+
+    String tableId = "datasets/d/tables/t";
+    MetricName internalErrorRetriedMetricName =
+        getAppendRowsCounterName(RowStatus.RETRIED, "INTERNAL", tableId);
+
+    testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName, 
10L);
+
+    // Subsequent updates to this object should update the underyling metrics.
+    results.updateRetriedRowsWithStatus("INTERNAL", 10);
+    results.updateStreamingInsertsMetrics(ref);
+
+    testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName, 
10L);
+  }
+}


Reply via email to