m-trieu commented on code in PR #30717:
URL: https://github.com/apache/beam/pull/30717#discussion_r1538300253


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java:
##########
@@ -60,19 +61,34 @@ public DelegatingHistogram(
     this.perWorkerHistogram = perWorkerHistogram;
   }
 
-  @Override
-  public void update(double value) {
+  private Optional<Histogram> getHistogram() {
     MetricsContainer container =
         processWideContainer
             ? MetricsEnvironment.getProcessWideContainer()
             : MetricsEnvironment.getCurrentContainer();
     if (container == null) {
-      return;
+      return Optional.empty();
     }
     if (perWorkerHistogram) {
-      container.getPerWorkerHistogram(name, bucketType).update(value);
+      return Optional.of(container.getPerWorkerHistogram(name, bucketType));
     } else {
-      container.getHistogram(name, bucketType).update(value);
+      return Optional.of(container.getHistogram(name, bucketType));
+    }
+  }
+
+  @Override
+  public void update(double value) {
+    Optional<Histogram> histogram = getHistogram();

Review Comment:
   nit: can write like `getHistogram.ifPresent(histogram -> 
histogram.update(value));`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java:
##########
@@ -156,6 +156,19 @@ public void updateSuccessfulAndFailedRows(int totalRows, 
int failedRows) {
       }
     }
 
+    /** Record rpc latency histogram metrics. */
+    private void recordRpcLatencyMetrics() {
+      Histogram latencyHistogram =
+          BigQuerySinkMetrics.createRPCLatencyHistogram(
+              BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
+      Object[] rpcLatencyObjs = rpcLatencies().toArray();
+      double[] rpcLatencies = new double[rpcLatencyObjs.length];

Review Comment:
   nit: might be wrong but can write like
   ```
   List<Double> rpcLatencies = rpcLatencies().stream().map(latencyObj -> 
((Duration) rpcLatencyObjs[i]).toMillis()).collection(toList());
   latencyHistogram.update(rpcLatencies);
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java:
##########
@@ -60,19 +61,34 @@ public DelegatingHistogram(
     this.perWorkerHistogram = perWorkerHistogram;
   }
 
-  @Override
-  public void update(double value) {
+  private Optional<Histogram> getHistogram() {
     MetricsContainer container =
         processWideContainer
             ? MetricsEnvironment.getProcessWideContainer()
             : MetricsEnvironment.getCurrentContainer();
     if (container == null) {
-      return;
+      return Optional.empty();
     }
     if (perWorkerHistogram) {
-      container.getPerWorkerHistogram(name, bucketType).update(value);
+      return Optional.of(container.getPerWorkerHistogram(name, bucketType));
     } else {
-      container.getHistogram(name, bucketType).update(value);
+      return Optional.of(container.getHistogram(name, bucketType));
+    }
+  }
+
+  @Override
+  public void update(double value) {
+    Optional<Histogram> histogram = getHistogram();
+    if (histogram.isPresent()) {
+      histogram.get().update(value);
+    }
+  }
+
+  @Override
+  public void update(double... values) {

Review Comment:
   nit: can write like getHistogram.ifPresent(histogram -> 
histogram.update(values));



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to