chamikaramj commented on a change in pull request #12403:
URL: https://github.com/apache/beam/pull/12403#discussion_r464549909



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
##########
@@ -543,6 +547,16 @@ public Long extractThrottleTime() {
         totalThrottleTime += httpClientApiThrottlingTime.getCumulative();
       }
 
+      CounterCell bigqueryStreamingInsertThrottleTime =
+          container.tryGetCounter(
+              MetricName.named(
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAME));

Review comment:
       Can we use the same name as above ("cumulativeThrottlingSeconds") and 
move it to a constant (and also do ms to sec conversion when setting) ?

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -520,8 +526,11 @@ public int getSize() {
     private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
       CounterStructuredName structuredName =
           stepCounterUpdate.getStructuredNameAndMetadata().getName();
-      if 
(THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
-          && 
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) {
+      if 
((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
+              && 
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
+          || (BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE.equals(

Review comment:
       Is there a reason why we needed to use a  unique name for BQ but not for 
GCS or Datastore ?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -867,6 +872,7 @@ public void deleteDataset(String projectId, String 
datasetId)
         }
         try {
           sleeper.sleep(nextBackoffMillis);
+          throttlingMilliSeconds.inc(nextBackoffMillis);

Review comment:
       This is for failures. Probably you need to increment the counter for 
backoff1 for rate limit errors above.
   
   cc: @ihji 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to