Make sure that we default to alwaysRetry instead of passing in a null retry policy.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc24c86e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc24c86e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc24c86e Branch: refs/heads/master Commit: cc24c86e5e17f9ac2ede45a6b6904dd23e90c014 Parents: 0d5d00d Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com> Authored: Tue Sep 19 10:40:12 2017 -0700 Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com> Committed: Thu Sep 21 20:09:21 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cc24c86e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3cb0d3b..3a4b699 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -34,6 +34,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.base.Predicates; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -1278,9 +1279,12 @@ public class BigQueryIO { getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" + " PCollection."); + InsertRetryPolicy retryPolicy = MoreObjects.firstNonNull( + getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry()); + StreamingInserts<DestinationT> streamingInserts = new StreamingInserts<>(getCreateDisposition(), dynamicDestinations) - .withInsertRetryPolicy(getFailedInsertRetryPolicy()) + .withInsertRetryPolicy(retryPolicy) .withTestServices((getBigQueryServices())); return rowsWithDestination.apply(streamingInserts); } else {