robinyqiu commented on a change in pull request #12403:
URL: https://github.com/apache/beam/pull/12403#discussion_r470228614
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -520,8 +526,11 @@ public int getSize() {
private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
CounterStructuredName structuredName =
stepCounterUpdate.getStructuredNameAndMetadata().getName();
- if
(THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
- &&
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) {
+ if
((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
+ &&
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
+ || (BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE.equals(
Review comment:
Yes. GCS and Datastore counters are only consumed by batch worker (the
`THROTTLING_MSECS_METRIC_NAME` counter here is a separate counter; I am not
sure what this is. Maybe all throttling metrics should go to this counter?
@ihji ).
Here in the streaming case, precision is on millisecond (whereas GCS and
DataStore only store seconds)
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
##########
@@ -543,6 +547,16 @@ public Long extractThrottleTime() {
totalThrottleTime += httpClientApiThrottlingTime.getCumulative();
}
+ CounterCell bigqueryStreamingInsertThrottleTime =
+ container.tryGetCounter(
+ MetricName.named(
+ BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+ BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAME));
Review comment:
Here we can use seconds, but on the streaming side msec is needed.
That's the reason why I kept msec.
For consistency, we can change all counters to use msec originally, and do
msec to sec conversion here. WDYT?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -867,6 +872,7 @@ public void deleteDataset(String projectId, String
datasetId)
}
try {
sleeper.sleep(nextBackoffMillis);
+ throttlingMilliSeconds.inc(nextBackoffMillis);
Review comment:
The retried failures here are transient failures, which I believe
include throttling. I have thought about incrementing backoff1 but that is
executed in a future (a parallel thread). If we accumulate counters over all
threads then I think we will over calculate the number. So I add the counter
here in the main thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]