DataflowPipelineJob: Retry messages, metrics, and status polls At some point in the past, we decided to use a rawDataflowClient that does not do retries when checking job status, because it was best-effort reporting to users. The purported goal was to not clutter the log with networking errors.
However, since that time, we have: * Added the ability to suppress logs (emit only at DEBUG level or not at all) when retrying. * Increased reliability of the job checking status so that these errors are less frequent and more indicative of quota or other issues. * Started using the metrics in tests, where we do need to retry transient issues (BEAM-350). So let's drop the raw transport client and just use the one that retries. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e471ced4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e471ced4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e471ced4 Branch: refs/heads/master Commit: e471ced41d3092a7af6b9f5ad4cb5a4cd9dc2e81 Parents: 1a200a6 Author: Dan Halperin <[email protected]> Authored: Thu Jun 16 08:57:18 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Jun 20 10:25:58 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ++-- .../beam/runners/dataflow/util/DataflowTransport.java | 11 ----------- 2 files changed, 2 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e471ced4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 91e34ac..5818ba5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -633,12 +633,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(options.getProject(), jobResult.getId(), - DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); + options.getDataflowClient(), aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same // (i.e., the returned job is not created by this request), throw - // DataflowJobAlreadyExistsException or DataflowJobAlreadyUpdatedExcetpion + // DataflowJobAlreadyExistsException or DataflowJobAlreadyUpdatedException // depending on whether this is a reload or not. if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty() && !jobResult.getClientRequestId().equals(requestId)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e471ced4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 09fca94..f988749 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -92,17 +92,6 @@ public class DataflowTransport { .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); } - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - private static HttpRequestInitializer chainHttpRequestInitializer( Credential credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) {
