robinyqiu commented on a change in pull request #12403:
URL: https://github.com/apache/beam/pull/12403#discussion_r470228614



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -520,8 +526,11 @@ public int getSize() {
     private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
       CounterStructuredName structuredName =
           stepCounterUpdate.getStructuredNameAndMetadata().getName();
-      if 
(THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
-          && 
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) {
+      if 
((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
+              && 
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
+          || (BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE.equals(

Review comment:
       Yes. GCS and Datastore counters are only consumed by batch worker (the 
`THROTTLING_MSECS_METRIC_NAME` counter here is a separate counter; I am not 
sure what this is. Maybe all throttling metrics should go to this counter? 
@ihji ).
   
   Here in the streaming case, precision is on millisecond (whereas GCS and 
DataStore only store seconds)

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
##########
@@ -543,6 +547,16 @@ public Long extractThrottleTime() {
         totalThrottleTime += httpClientApiThrottlingTime.getCumulative();
       }
 
+      CounterCell bigqueryStreamingInsertThrottleTime =
+          container.tryGetCounter(
+              MetricName.named(
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAME));

Review comment:
       Here we can use seconds, but on the streaming side msec is needed. 
That's the reason why I kept msec.
   
   For consistency, we can change all counters to use msec originally, and do 
msec to sec conversion here. WDYT?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -867,6 +872,7 @@ public void deleteDataset(String projectId, String 
datasetId)
         }
         try {
           sleeper.sleep(nextBackoffMillis);
+          throttlingMilliSeconds.inc(nextBackoffMillis);

Review comment:
       The retried failures here are transient failures, which I believe 
include throttling. I have thought about incrementing backoff1 but that is 
executed in a future (a parallel thread). If we accumulate counters over all 
threads then I think we will over calculate the number. So I add the counter 
here in the main thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to