m-trieu commented on code in PR #30320:
URL: https://github.com/apache/beam/pull/30320#discussion_r1491496610


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java:
##########
@@ -1258,6 +1278,10 @@ <T> long insertAll(
           }
         }
       }
+      streamingInsertsResults.failedRowsCount.getAndAdd(allErrors.size());

Review Comment:
   see comment about making instance method
   maybe this one could be something like:
   ```
   @AutoValue
   public abstract static StreamingInsertResults {
   ...
   void updateRowCount(int failedRowCount, int successRowCount) {
       failedRowsCount().getAndAdd(failedRowCount);
       successfulRowsCount().set(successRowCount - failedRowsCount.get())
   }
   ...
   }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -326,6 +337,83 @@ public static void reportFailedRPCMetrics(
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Results of a batch of InsertAll RPCs. Used to update metrics with the 
{@code
+   * updateStreamingInsertsMetrics} method. Member variables should be 
thread-safe.
+   */
+  public static class StreamingInsertsResults {

Review Comment:
   if this is a data class, can you use AutoValue? 
   
https://g3doc.corp.google.com/third_party/java_src/auto/value/g3doc/index.md?cl=head
   
   can also annotate with `@ThreadSafe`
   
   also the members are threadsafe individually, but if you need atomic 
operations across multiple members that will still require a lock 
(`synchronized (someObject)`)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -326,6 +337,83 @@ public static void reportFailedRPCMetrics(
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Results of a batch of InsertAll RPCs. Used to update metrics with the 
{@code
+   * updateStreamingInsertsMetrics} method. Member variables should be 
thread-safe.
+   */
+  public static class StreamingInsertsResults {
+    final ConcurrentLinkedQueue<java.time.Duration> rpcLatencies = new 
ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> rpcStatus = new 
ConcurrentLinkedQueue<>();
+    // Represents <Rpc Status, Number of Rows> for rows that are retried 
because the InsertAll RPC
+    // failed.
+    final ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus =
+        new ConcurrentLinkedQueue<>();
+    final AtomicInteger successfulRowsCount = new AtomicInteger();
+    final AtomicInteger failedRowsCount = new AtomicInteger();
+    // Rows that were retried due to an internal BigQuery Error even though 
the InsertAll RPC
+    // succeeded.
+    final AtomicInteger internalRetriedRowsCount = new AtomicInteger();
+  }
+
+  /**
+   * Update the {@code RpcRequestsCount}, {@code RpcLatency}, and {@code 
RowsAppendedCount} metrics
+   * based on the results of StreamingInserts RPC calls.
+   *
+   * @param results Result of StreamingInsert RPC calls
+   * @param tableRef BigQuery table that was written to.
+   */

Review Comment:
   how about making this an instance method w/in `StreamingInsertsResults`?
   
   can be like 
   ```
   public void updateMetrics(@Nullable TableReference tableRef) {
   
   ...//do operations on members instead of passed in StreamingInsertsResults.
   
   }
   ```
   
   this can be done even with AutoValue



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -326,6 +337,83 @@ public static void reportFailedRPCMetrics(
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Results of a batch of InsertAll RPCs. Used to update metrics with the 
{@code
+   * updateStreamingInsertsMetrics} method. Member variables should be 
thread-safe.
+   */
+  public static class StreamingInsertsResults {
+    final ConcurrentLinkedQueue<java.time.Duration> rpcLatencies = new 
ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> rpcStatus = new 
ConcurrentLinkedQueue<>();
+    // Represents <Rpc Status, Number of Rows> for rows that are retried 
because the InsertAll RPC
+    // failed.
+    final ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus =
+        new ConcurrentLinkedQueue<>();
+    final AtomicInteger successfulRowsCount = new AtomicInteger();
+    final AtomicInteger failedRowsCount = new AtomicInteger();
+    // Rows that were retried due to an internal BigQuery Error even though 
the InsertAll RPC
+    // succeeded.
+    final AtomicInteger internalRetriedRowsCount = new AtomicInteger();
+  }
+
+  /**
+   * Update the {@code RpcRequestsCount}, {@code RpcLatency}, and {@code 
RowsAppendedCount} metrics
+   * based on the results of StreamingInserts RPC calls.
+   *
+   * @param results Result of StreamingInsert RPC calls
+   * @param tableRef BigQuery table that was written to.
+   */
+  public static void updateStreamingInsertsMetrics(

Review Comment:
   when would the inputs be null?
   mark results and tableRef as `@Nullable` like 
   `(@Nullable StreamingInsertsResults results, @Nullable TableReference 
tableRef)`
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -326,6 +337,83 @@ public static void reportFailedRPCMetrics(
     updateRpcLatencyMetric(c, method);
   }
 
+  /**
+   * Results of a batch of InsertAll RPCs. Used to update metrics with the 
{@code
+   * updateStreamingInsertsMetrics} method. Member variables should be 
thread-safe.
+   */
+  public static class StreamingInsertsResults {
+    final ConcurrentLinkedQueue<java.time.Duration> rpcLatencies = new 
ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> rpcStatus = new 
ConcurrentLinkedQueue<>();
+    // Represents <Rpc Status, Number of Rows> for rows that are retried 
because the InsertAll RPC
+    // failed.
+    final ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus =
+        new ConcurrentLinkedQueue<>();
+    final AtomicInteger successfulRowsCount = new AtomicInteger();
+    final AtomicInteger failedRowsCount = new AtomicInteger();
+    // Rows that were retried due to an internal BigQuery Error even though 
the InsertAll RPC
+    // succeeded.
+    final AtomicInteger internalRetriedRowsCount = new AtomicInteger();
+  }
+
+  /**
+   * Update the {@code RpcRequestsCount}, {@code RpcLatency}, and {@code 
RowsAppendedCount} metrics
+   * based on the results of StreamingInserts RPC calls.
+   *
+   * @param results Result of StreamingInsert RPC calls
+   * @param tableRef BigQuery table that was written to.
+   */
+  public static void updateStreamingInsertsMetrics(
+      StreamingInsertsResults results, TableReference tableRef) {
+    if (results == null || tableRef == null) {
+      return;
+    }
+
+    String shortTableId =
+        String.format("datasets/%s/tables/%s", tableRef.getDatasetId(), 
tableRef.getTableId());
+    Map<String, Integer> rpcRequetsRpcStatusMap = new HashMap<>();

Review Comment:
   see comment above about atomicity for the concurrent collections.
   They allow concurrent writes to the data structure (i.e you could be 
iterating/reading here, while updates are happening to the data structure 
without ConcurrentModificationException, it doesn't guarantee that you will see 
every update while iterating -`Iterators are weakly consistent, returning 
elements reflecting the state of the queue at some point at or since the 
creation of the iterator. They do not throw 
[ConcurrentModificationException](https://docs.oracle.com/javase%2F7%2Fdocs%2Fapi%2F%2F/java/util/ConcurrentModificationException.html),
 and may proceed concurrently with other operations. Elements contained in the 
queue since the creation of the iterator will be returned exactly once.` from 
javadoc)
   
   if you want (not sure if you do), to tie the operations together (the 
iterating over `results.rpcStatus` and `results.retriedRowsByStatus`) you will 
need to wrap the iteration in a lock like :
   
   ```
   synchronized (someObject) // if instance method in StreamingInsertsResults, 
can use synchronized(this)
       for (String status : results.rpcStatus) {
         Integer currentVal = rpcRequetsRpcStatusMap.getOrDefault(status, 0);
         rpcRequetsRpcStatusMap.put(status, currentVal + 1);
       }
   
       for (KV<String, Integer> retryCountByStatus : 
results.retriedRowsByStatus) {
         Integer currentVal = 
retriedRowsRpcStatusMap.getOrDefault(retryCountByStatus.getKey(), 0);
         retriedRowsRpcStatusMap.put(
             retryCountByStatus.getKey(), currentVal + 
retryCountByStatus.getValue());
       }
   }
   ```
   
   depending on the behavior you want, if you don't need to tie these 2 
operations together than its ok to leave as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to