JayajP commented on code in PR #30320:
URL: https://github.com/apache/beam/pull/30320#discussion_r1493041376


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -326,6 +337,83 @@ public static void reportFailedRPCMetrics(
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Results of a batch of InsertAll RPCs. Used to update metrics with the 
{@code
+   * updateStreamingInsertsMetrics} method. Member variables should be 
thread-safe.
+   */
+  public static class StreamingInsertsResults {
+    final ConcurrentLinkedQueue<java.time.Duration> rpcLatencies = new 
ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> rpcStatus = new 
ConcurrentLinkedQueue<>();
+    // Represents <Rpc Status, Number of Rows> for rows that are retried 
because the InsertAll RPC
+    // failed.
+    final ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus =
+        new ConcurrentLinkedQueue<>();
+    final AtomicInteger successfulRowsCount = new AtomicInteger();
+    final AtomicInteger failedRowsCount = new AtomicInteger();
+    // Rows that were retried due to an internal BigQuery Error even though 
the InsertAll RPC
+    // succeeded.
+    final AtomicInteger internalRetriedRowsCount = new AtomicInteger();
+  }
+
+  /**
+   * Update the {@code RpcRequestsCount}, {@code RpcLatency}, and {@code 
RowsAppendedCount} metrics
+   * based on the results of StreamingInserts RPC calls.
+   *
+   * @param results Result of StreamingInsert RPC calls
+   * @param tableRef BigQuery table that was written to.
+   */
+  public static void updateStreamingInsertsMetrics(
+      StreamingInsertsResults results, TableReference tableRef) {
+    if (results == null || tableRef == null) {
+      return;
+    }
+
+    String shortTableId =
+        String.format("datasets/%s/tables/%s", tableRef.getDatasetId(), 
tableRef.getTableId());
+    Map<String, Integer> rpcRequetsRpcStatusMap = new HashMap<>();

Review Comment:
   Don't need synchronization across member variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to