Make sure that we default to alwaysRetry instead of passing in a null retry 
policy.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc24c86e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc24c86e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc24c86e

Branch: refs/heads/master
Commit: cc24c86e5e17f9ac2ede45a6b6904dd23e90c014
Parents: 0d5d00d
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Sep 19 10:40:12 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Sep 21 20:09:21 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cc24c86e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3cb0d3b..3a4b699 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -34,6 +34,7 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -1278,9 +1279,12 @@ public class BigQueryIO {
             getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
                 + " PCollection.");
+        InsertRetryPolicy retryPolicy = MoreObjects.firstNonNull(
+            getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry());
+
         StreamingInserts<DestinationT> streamingInserts =
             new StreamingInserts<>(getCreateDisposition(), dynamicDestinations)
-                .withInsertRetryPolicy(getFailedInsertRetryPolicy())
+                .withInsertRetryPolicy(retryPolicy)
                 .withTestServices((getBigQueryServices()));
         return rowsWithDestination.apply(streamingInserts);
       } else {

Reply via email to