Repository: beam
Updated Branches:
  refs/heads/master eee6726aa -> 53c9bf4cd


Dead-letter support for BigQuery. Allow users to specify a retry policy, and 
get failed inserts back.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a57ff0ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a57ff0ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a57ff0ee

Branch: refs/heads/master
Commit: a57ff0eef1b7166d02243e19c4632f8924a014f0
Parents: eee6726
Author: Reuven Lax <[email protected]>
Authored: Thu May 11 06:40:47 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Jun 2 13:15:01 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  8 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 22 ++++-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  8 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 39 +++++----
 .../sdk/io/gcp/bigquery/InsertRetryPolicy.java  | 86 +++++++++++++++++++
 .../sdk/io/gcp/bigquery/StreamingInserts.java   | 35 ++++++--
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   | 50 ++++++++---
 .../io/gcp/bigquery/StreamingWriteTables.java   | 26 ++++--
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   | 27 ++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 56 +++++++++++++
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 88 +++++++++++++++++---
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   | 11 ++-
 .../sdk/io/gcp/bigquery/FakeDatasetService.java | 75 ++++++++++++++++-
 .../io/gcp/bigquery/InsertRetryPolicyTest.java  | 79 ++++++++++++++++++
 14 files changed, 540 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 0abd469..3686f99 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** PTransform that uses BigQuery batch-load jobs to write a PCollection to 
BigQuery. */
 class BatchLoads<DestinationT>
@@ -248,7 +249,8 @@ class BatchLoads<DestinationT>
     // This transform will look at the set of files written for each table, 
and if any table has
     // too many files or bytes, will partition that table's files into 
multiple partitions for
     // loading.
-    PCollection<Void> singleton = p.apply(Create.of((Void) 
null).withCoder(VoidCoder.of()));
+    PCollection<Void> singleton = p.apply("singleton",
+        Create.of((Void) null).withCoder(VoidCoder.of()));
     PCollectionTuple partitions =
         singleton.apply(
             "WritePartition",
@@ -333,6 +335,8 @@ class BatchLoads<DestinationT>
                         dynamicDestinations))
                 .withSideInputs(writeTablesSideInputs));
 
-    return WriteResult.in(input.getPipeline());
+    PCollection<TableRow> empty =
+        p.apply("CreateEmptyFailedInserts", 
Create.empty(TypeDescriptor.of(TableRow.class)));
+    return WriteResult.in(input.getPipeline(), new 
TupleTag<TableRow>("failedInserts"), empty);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index cf258ca..6a93279 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -674,6 +674,7 @@ public class BigQueryIO {
     abstract BigQueryServices getBigQueryServices();
     @Nullable abstract Integer getMaxFilesPerBundle();
     @Nullable abstract Long getMaxFileSize();
+    @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
 
     abstract Builder<T> toBuilder();
 
@@ -693,6 +694,7 @@ public class BigQueryIO {
       abstract Builder<T> setBigQueryServices(BigQueryServices 
bigQueryServices);
       abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);
       abstract Builder<T> setMaxFileSize(Long maxFileSize);
+      abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy 
retryPolicy);
 
       abstract Write<T> build();
     }
@@ -861,6 +863,17 @@ public class BigQueryIO {
       return toBuilder().setTableDescription(tableDescription).build();
     }
 
+    /** Specfies a policy for handling failed inserts.
+     *
+     * <p>Currently this only is allowed when writing an unbounded collection 
to BigQuery. Bounded
+     * collections are written using batch load jobs, so we don't get 
per-element failures.
+     * Unbounded collections are written using streaming inserts, so we have 
access to per-element
+     * insert results.
+     */
+    public Write<T> withFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy) 
{
+      return toBuilder().setFailedInsertRetryPolicy(retryPolicy).build();
+    }
+
     /** Disables BigQuery table validation. */
     public Write<T> withoutValidation() {
       return toBuilder().setValidate(false).build();
@@ -935,6 +948,7 @@ public class BigQueryIO {
           "No more than one of jsonSchema, schemaFromView, or 
dynamicDestinations may "
               + "be set");
 
+
       DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
       if (dynamicDestinations == null) {
         if (getJsonTableRef() != null) {
@@ -981,10 +995,14 @@ public class BigQueryIO {
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
                 + " PCollection.");
         StreamingInserts<DestinationT> streamingInserts =
-            new StreamingInserts<>(getCreateDisposition(), 
dynamicDestinations);
-        streamingInserts.setTestServices(getBigQueryServices());
+            new StreamingInserts<>(getCreateDisposition(), dynamicDestinations)
+                .withInsertRetryPolicy(getFailedInsertRetryPolicy())
+                .withTestServices((getBigQueryServices()));
         return rowsWithDestination.apply(streamingInserts);
       } else {
+        checkArgument(getFailedInsertRetryPolicy() == null,
+            "Record-insert retry policies are not supported when using 
BigQuery load jobs.");
+
         BatchLoads<DestinationT> batchLoads = new BatchLoads<>(
             getWriteDisposition(),
             getCreateDisposition(),

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 1ae10bc..c067229 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** An interface for real, mock, or fake implementations of Cloud BigQuery 
services. */
 interface BigQueryServices extends Serializable {
@@ -161,9 +162,14 @@ interface BigQueryServices extends Serializable {
     /**
      * Inserts {@link TableRow TableRows} with the specified insertIds if not 
null.
      *
+     * <p>If any insert fail permanently according to the retry policy, those 
rows are added
+     * to failedInserts.
+     *
      * <p>Returns the total bytes count of {@link TableRow TableRows}.
      */
-    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable 
List<String> insertIdList)
+    long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> 
rowList,
+                   @Nullable List<String> insertIdList, InsertRetryPolicy 
retryPolicy,
+                   List<ValueInSingleWindow<TableRow>> failedInserts)
         throws IOException, InterruptedException;
 
     /** Patch BigQuery {@link Table} description. */

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5d5a519..b14405e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -656,8 +657,11 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @VisibleForTesting
-    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable 
List<String> insertIdList,
-        BackOff backoff, final Sleeper sleeper) throws IOException, 
InterruptedException {
+    long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> 
rowList,
+                   @Nullable List<String> insertIdList,
+                   BackOff backoff, final Sleeper sleeper, InsertRetryPolicy 
retryPolicy,
+                   List<ValueInSingleWindow<TableRow>> failedInserts)
+        throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
         this.executor = options.as(GcsOptions.class).getExecutorService();
@@ -671,10 +675,10 @@ class BigQueryServicesImpl implements BigQueryServices {
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new 
ArrayList<>();
       // These lists contain the rows to publish. Initially the contain the 
entire list.
       // If there are failures, they will contain only the failed rows to be 
retried.
-      List<TableRow> rowsToPublish = rowList;
+      List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
       List<String> idsToPublish = insertIdList;
       while (true) {
-        List<TableRow> retryRows = new ArrayList<>();
+        List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();
         List<String> retryIds = (idsToPublish != null) ? new 
ArrayList<String>() : null;
 
         int strideIndex = 0;
@@ -686,7 +690,7 @@ class BigQueryServicesImpl implements BigQueryServices {
         List<Integer> strideIndices = new ArrayList<>();
 
         for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i);
+          TableRow row = rowsToPublish.get(i).getValue();
           TableDataInsertAllRequest.Rows out = new 
TableDataInsertAllRequest.Rows();
           if (idsToPublish != null) {
             out.setInsertId(idsToPublish.get(i));
@@ -743,18 +747,23 @@ class BigQueryServicesImpl implements BigQueryServices {
         try {
           for (int i = 0; i < futures.size(); i++) {
             List<TableDataInsertAllResponse.InsertErrors> errors = 
futures.get(i).get();
-            if (errors != null) {
-              for (TableDataInsertAllResponse.InsertErrors error : errors) {
-                allErrors.add(error);
-                if (error.getIndex() == null) {
-                  throw new IOException("Insert failed: " + allErrors);
-                }
+            if (errors == null) {
+              continue;
+            }
+            for (TableDataInsertAllResponse.InsertErrors error : errors) {
+              if (error.getIndex() == null) {
+                throw new IOException("Insert failed: " + error + ", other 
errors: " + allErrors);
+              }
 
-                int errorIndex = error.getIndex().intValue() + 
strideIndices.get(i);
+              int errorIndex = error.getIndex().intValue() + 
strideIndices.get(i);
+              if (retryPolicy.shouldRetry(new 
InsertRetryPolicy.Context(error))) {
+                allErrors.add(error);
                 retryRows.add(rowsToPublish.get(errorIndex));
                 if (retryIds != null) {
                   retryIds.add(idsToPublish.get(errorIndex));
                 }
+              } else {
+                failedInserts.add(rowsToPublish.get(errorIndex));
               }
             }
           }
@@ -793,13 +802,15 @@ class BigQueryServicesImpl implements BigQueryServices {
 
     @Override
     public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
+        TableReference ref, List<ValueInSingleWindow<TableRow>> rowList,
+        @Nullable List<String> insertIdList,
+        InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<TableRow>> 
failedInserts)
         throws IOException, InterruptedException {
       return insertAll(
           ref, rowList, insertIdList,
           BackOffAdapter.toGcpBackOff(
               INSERT_BACKOFF_FACTORY.backoff()),
-          Sleeper.DEFAULT);
+          Sleeper.DEFAULT, retryPolicy, failedInserts);
     }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java
new file mode 100644
index 0000000..90a3d0d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Set;
+
+/** A retry policy for streaming BigQuery inserts. */
+public abstract class InsertRetryPolicy implements Serializable {
+  /**
+   * Contains information about a failed insert.
+   *
+   * <p>Currently only the list of errors returned from BigQuery. In the 
future this may contain
+   * more information - e.g. how many times this insert has been retried, and 
for how long.
+   */
+  public static class Context {
+    // A list of all errors corresponding to an attempted insert of a single 
record.
+    TableDataInsertAllResponse.InsertErrors errors;
+
+    public Context(TableDataInsertAllResponse.InsertErrors errors) {
+      this.errors = errors;
+    }
+  }
+
+  // A list of known persistent errors for which retrying never helps.
+  static final Set<String> PERSISTENT_ERRORS =
+      ImmutableSet.of("invalid", "invalidQuery", "notImplemented");
+
+  /** Return true if this failure should be retried. */
+  public abstract boolean shouldRetry(Context context);
+
+  /** Never retry any failures. */
+  public static InsertRetryPolicy neverRetry() {
+    return new InsertRetryPolicy() {
+      @Override
+      public boolean shouldRetry(Context context) {
+        return false;
+      }
+    };
+  }
+
+  /** Always retry all failures. */
+  public static InsertRetryPolicy alwaysRetry() {
+    return new InsertRetryPolicy() {
+      @Override
+      public boolean shouldRetry(Context context) {
+        return true;
+      }
+    };
+  }
+
+  /** Retry all failures except for known persistent errors. */
+  public static InsertRetryPolicy retryTransientErrors() {
+    return new InsertRetryPolicy() {
+      @Override
+      public boolean shouldRetry(Context context) {
+        if (context.errors.getErrors() != null) {
+          for (ErrorProto error : context.errors.getErrors()) {
+            if (error.getReason() != null && 
PERSISTENT_ERRORS.contains(error.getReason())) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 9cb0027..ba09cb3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -35,19 +35,39 @@ public class StreamingInserts<DestinationT>
   private BigQueryServices bigQueryServices;
   private final CreateDisposition createDisposition;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private InsertRetryPolicy retryPolicy;
 
   /** Constructor. */
-  StreamingInserts(CreateDisposition createDisposition,
+  public StreamingInserts(CreateDisposition createDisposition,
                    DynamicDestinations<?, DestinationT> dynamicDestinations) {
+    this(createDisposition, dynamicDestinations, new BigQueryServicesImpl(),
+        InsertRetryPolicy.alwaysRetry());
+  }
+
+  /** Constructor. */
+  private StreamingInserts(CreateDisposition createDisposition,
+                          DynamicDestinations<?, DestinationT> 
dynamicDestinations,
+                          BigQueryServices bigQueryServices,
+                          InsertRetryPolicy retryPolicy) {
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
-    this.bigQueryServices = new BigQueryServicesImpl();
+    this.bigQueryServices = bigQueryServices;
+    this.retryPolicy = retryPolicy;
   }
 
-  void setTestServices(BigQueryServices bigQueryServices) {
-    this.bigQueryServices = bigQueryServices;
+  /**
+   * Specify a retry policy for failed inserts.
+   */
+  public StreamingInserts<DestinationT> 
withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
+    return new StreamingInserts<>(
+        createDisposition, dynamicDestinations, bigQueryServices, retryPolicy);
   }
 
+  StreamingInserts<DestinationT> withTestServices(BigQueryServices 
bigQueryServices) {
+    return new StreamingInserts<>(
+        createDisposition, dynamicDestinations, bigQueryServices, 
retryPolicy);  }
+
+
   @Override
   protected Coder<Void> getDefaultOutputCoder() {
     return VoidCoder.of();
@@ -58,9 +78,12 @@ public class StreamingInserts<DestinationT>
     PCollection<KV<TableDestination, TableRow>> writes =
         input.apply(
             "CreateTables",
-            new CreateTables<DestinationT>(createDisposition, 
dynamicDestinations)
+            new CreateTables<>(createDisposition, dynamicDestinations)
                 .withTestServices(bigQueryServices));
 
-    return writes.apply(new 
StreamingWriteTables().withTestServices(bigQueryServices));
+    return writes.apply(
+        new StreamingWriteTables()
+            .withTestServices(bigQueryServices)
+            .withInsertRetryPolicy(retryPolicy));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index f267976..63e5bc1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -29,8 +30,11 @@ import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
  * Implementation of DoFn to perform streaming BigQuery write.
@@ -40,9 +44,12 @@ import org.apache.beam.sdk.values.KV;
 class StreamingWriteFn
     extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
   private final BigQueryServices bqServices;
+  private final InsertRetryPolicy retryPolicy;
+  private final TupleTag<TableRow> failedOutputTag;
+
 
   /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
-  private transient Map<String, List<TableRow>> tableRows;
+  private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
 
   /** The list of unique ids for each BigQuery table row. */
   private transient Map<String, List<String>> uniqueIdsForTableRows;
@@ -50,8 +57,11 @@ class StreamingWriteFn
   /** Tracks bytes written, exposed as "ByteCount" Counter. */
   private Counter byteCounter = SinkMetrics.bytesWritten();
 
-  StreamingWriteFn(BigQueryServices bqServices) {
+  StreamingWriteFn(BigQueryServices bqServices, InsertRetryPolicy retryPolicy,
+                   TupleTag<TableRow> failedOutputTag) {
     this.bqServices = bqServices;
+    this.retryPolicy = retryPolicy;
+    this.failedOutputTag = failedOutputTag;
   }
 
   /** Prepares a target BigQuery table. */
@@ -63,27 +73,39 @@ class StreamingWriteFn
 
   /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
   @ProcessElement
-  public void processElement(ProcessContext context) {
+  public void processElement(ProcessContext context, BoundedWindow window) {
     String tableSpec = context.element().getKey().getKey();
-    List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, 
tableSpec);
-    List<String> uniqueIds = 
BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
-        tableSpec);
+    List<ValueInSingleWindow<TableRow>> rows =
+        BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
+    List<String> uniqueIds =
+        BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, 
tableSpec);
 
-    rows.add(context.element().getValue().tableRow);
+    rows.add(
+        ValueInSingleWindow.of(
+            context.element().getValue().tableRow, context.timestamp(), 
window, context.pane()));
     uniqueIds.add(context.element().getValue().uniqueId);
   }
 
   /** Writes the accumulated rows into BigQuery with streaming API. */
   @FinishBundle
   public void finishBundle(FinishBundleContext context) throws Exception {
+    List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
     BigQueryOptions options = 
context.getPipelineOptions().as(BigQueryOptions.class);
-    for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
+    for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry : 
tableRows.entrySet()) {
       TableReference tableReference = 
BigQueryHelpers.parseTableSpec(entry.getKey());
-      flushRows(tableReference, entry.getValue(),
-          uniqueIdsForTableRows.get(entry.getKey()), options);
+      flushRows(
+          tableReference,
+          entry.getValue(),
+          uniqueIdsForTableRows.get(entry.getKey()),
+          options,
+          failedInserts);
     }
     tableRows.clear();
     uniqueIdsForTableRows.clear();
+
+    for (ValueInSingleWindow<TableRow> row : failedInserts) {
+      context.output(failedOutputTag, row.getValue(), row.getTimestamp(), 
row.getWindow());
+    }
   }
 
   @Override
@@ -95,12 +117,14 @@ class StreamingWriteFn
    * Writes the accumulated rows into BigQuery with streaming API.
    */
   private void flushRows(TableReference tableReference,
-      List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions 
options)
-          throws InterruptedException {
+                         List<ValueInSingleWindow<TableRow>> tableRows,
+                         List<String> uniqueIds, BigQueryOptions options,
+                         List<ValueInSingleWindow<TableRow>> failedInserts)
+      throws InterruptedException {
     if (!tableRows.isEmpty()) {
       try {
         long totalBytes = bqServices.getDatasetService(options).insertAll(
-            tableReference, tableRows, uniqueIds);
+            tableReference, tableRows, uniqueIds, retryPolicy, failedInserts);
         byteCounter.inc(totalBytes);
       } catch (IOException e) {
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 886236b..18b2033 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -28,6 +28,9 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 
 /**
  * This transform takes in key-value pairs of {@link TableRow} entries and the
@@ -40,17 +43,23 @@ import org.apache.beam.sdk.values.PCollection;
 public class StreamingWriteTables extends PTransform<
     PCollection<KV<TableDestination, TableRow>>, WriteResult> {
   private BigQueryServices bigQueryServices;
+  private InsertRetryPolicy retryPolicy;
 
   public StreamingWriteTables() {
-    this(new BigQueryServicesImpl());
+    this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry());
   }
 
-  private StreamingWriteTables(BigQueryServices bigQueryServices) {
+  private StreamingWriteTables(BigQueryServices bigQueryServices, 
InsertRetryPolicy retryPolicy) {
     this.bigQueryServices = bigQueryServices;
+    this.retryPolicy = retryPolicy;
   }
 
   StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) {
-    return new StreamingWriteTables(bigQueryServices);
+    return new StreamingWriteTables(bigQueryServices, retryPolicy);
+  }
+
+  StreamingWriteTables withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
+    return new StreamingWriteTables(bigQueryServices, retryPolicy);
   }
 
   @Override
@@ -77,7 +86,9 @@ public class StreamingWriteTables extends PTransform<
     // different unique ids, this implementation relies on "checkpointing", 
which is
     // achieved as a side effect of having StreamingWriteFn immediately follow 
a GBK,
     // performed by Reshuffle.
-    tagged
+    TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
+    TupleTag<TableRow> failedInsertsTag = new TupleTag<>("failedInserts");
+    PCollectionTuple tuple = tagged
         .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), 
TableRowInfoCoder.of()))
         .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
         // Put in the global window to ensure that DynamicDestinations side 
inputs are accessed
@@ -87,7 +98,10 @@ public class StreamingWriteTables extends PTransform<
             .triggering(DefaultTrigger.of()).discardingFiredPanes())
         .apply("StreamingWrite",
             ParDo.of(
-                new StreamingWriteFn(bigQueryServices)));
-    return WriteResult.in(input.getPipeline());
+                new StreamingWriteFn(bigQueryServices, retryPolicy, 
failedInsertsTag))
+            .withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag)));
+    PCollection<TableRow> failedInserts = tuple.get(failedInsertsTag);
+    failedInserts.setCoder(TableRowJsonCoder.of());
+    return WriteResult.in(input.getPipeline(), failedInsertsTag, 
failedInserts);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index db0be3a..4f6b23e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import java.util.Collections;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -30,23 +32,30 @@ import org.apache.beam.sdk.values.TupleTag;
  * The result of a {@link BigQueryIO.Write} transform.
  */
 public final class WriteResult implements POutput {
-
   private final Pipeline pipeline;
+  private final TupleTag<TableRow> failedInsertsTag;
+  private final PCollection<TableRow> failedInserts;
 
-  /**
-   * Creates a {@link WriteResult} in the given {@link Pipeline}.
-   */
-  static WriteResult in(Pipeline pipeline) {
-    return new WriteResult(pipeline);
+  /** Creates a {@link WriteResult} in the given {@link Pipeline}. */
+  static WriteResult in(
+      Pipeline pipeline, TupleTag<TableRow> failedInsertsTag, 
PCollection<TableRow> failedInserts) {
+    return new WriteResult(pipeline, failedInsertsTag, failedInserts);
   }
 
   @Override
   public Map<TupleTag<?>, PValue> expand() {
-    return Collections.emptyMap();
+    return ImmutableMap.<TupleTag<?>, PValue>of(failedInsertsTag, 
failedInserts);
   }
 
-  private WriteResult(Pipeline pipeline) {
+  private WriteResult(
+      Pipeline pipeline, TupleTag<TableRow> failedInsertsTag, 
PCollection<TableRow> failedInserts) {
     this.pipeline = pipeline;
+    this.failedInsertsTag = failedInsertsTag;
+    this.failedInserts = failedInserts;
+  }
+
+  public PCollection<TableRow> getFailedInserts() {
+    return failedInserts;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 5408fd4..04bbac4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -35,12 +35,14 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatistics2;
 import com.google.api.services.bigquery.model.JobStatistics4;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -126,6 +128,7 @@ import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
@@ -602,6 +605,59 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testRetryPolicy() throws Exception {
+    BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
+
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+    TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+    TableRow row3 = new TableRow().set("name", "c").set("number", "3");
+
+    TableDataInsertAllResponse.InsertErrors ephemeralError =
+        new TableDataInsertAllResponse.InsertErrors().setErrors(
+            ImmutableList.of(new ErrorProto().setReason("timeout")));
+    TableDataInsertAllResponse.InsertErrors persistentError =
+        new TableDataInsertAllResponse.InsertErrors().setErrors(
+            ImmutableList.of(new ErrorProto().setReason("invalidQuery")));
+
+    datasetService.failOnInsert(
+        ImmutableMap.<TableRow, 
List<TableDataInsertAllResponse.InsertErrors>>of(
+        row1, ImmutableList.of(ephemeralError, ephemeralError),
+        row2, ImmutableList.of(ephemeralError, ephemeralError, 
persistentError)));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    PCollection<TableRow> failedRows =
+        p.apply(Create.of(row1, row2, row3))
+            .setIsBoundedInternal(IsBounded.UNBOUNDED)
+            
.apply(BigQueryIO.writeTableRows().to("project-id:dataset-id.table-id")
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withSchema(new TableSchema().setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+            
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+            .withTestServices(fakeBqServices)
+            .withoutValidation()).getFailedInserts();
+    // row2 finally fails with a non-retryable error, so we expect to see it 
in the collection of
+    // failed rows.
+    PAssert.that(failedRows).containsInAnyOrder(row2);
+    p.run();
+
+    // Only row1 and row3 were successfully inserted.
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id"),
+        containsInAnyOrder(row1, row3));
+
+  }
+
+  @Test
   public void testWrite() throws Exception {
     BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultproject");

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index b41490f..f602038 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -58,6 +58,7 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.RetryBoundedBackOff;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -67,11 +68,14 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceIm
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
@@ -485,6 +489,11 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getContentType();
   }
 
+  private ValueInSingleWindow<TableRow> wrapTableRow(TableRow row) {
+    return ValueInSingleWindow.of(row, GlobalWindow.TIMESTAMP_MAX_VALUE,
+        GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+  }
+
   /**
    * Tests that {@link DatasetServiceImpl#insertAll} retries quota rate 
limited attempts.
    */
@@ -492,8 +501,8 @@ public class BigQueryServicesImplTest {
   public void testInsertRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
+    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    rows.add(wrapTableRow(new TableRow()));
 
     // First response is 403 rate limited, second response has valid payload.
     when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
@@ -505,7 +514,8 @@ public class BigQueryServicesImplTest {
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
     dataService.insertAll(ref, rows, null,
-        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper());
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(), null);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -524,8 +534,9 @@ public class BigQueryServicesImplTest {
   public void testInsertRetrySelectRows() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = ImmutableList.of(
-        new TableRow().set("row", "a"), new TableRow().set("row", "b"));
+    List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of(
+        wrapTableRow(new TableRow().set("row", "a")),
+        wrapTableRow(new TableRow().set("row", "b")));
     List<String> insertIds = ImmutableList.of("a", "b");
 
     final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
@@ -542,11 +553,11 @@ public class BigQueryServicesImplTest {
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
     dataService.insertAll(ref, rows, insertIds,
-        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper());
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(), null);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
-    expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
   }
 
   /**
@@ -556,7 +567,8 @@ public class BigQueryServicesImplTest {
   public void testInsertFailsGracefully() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow());
+    List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of(
+        wrapTableRow(new TableRow()), wrapTableRow(new TableRow()));
 
     final TableDataInsertAllResponse row1Failed = new 
TableDataInsertAllResponse()
         .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
@@ -584,7 +596,8 @@ public class BigQueryServicesImplTest {
     // Expect it to fail.
     try {
       dataService.insertAll(ref, rows, null,
-          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper());
+          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper(),
+          InsertRetryPolicy.alwaysRetry(), null);
       fail();
     } catch (IOException e) {
       assertThat(e, instanceOf(IOException.class));
@@ -606,8 +619,8 @@ public class BigQueryServicesImplTest {
   public void testInsertDoesNotRetry() throws Throwable {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
+    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    rows.add(wrapTableRow(new TableRow()));
 
     // First response is 403 not-rate-limited, second response has valid 
payload but should not
     // be invoked.
@@ -625,7 +638,8 @@ public class BigQueryServicesImplTest {
 
     try {
       dataService.insertAll(ref, rows, null,
-          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper());
+          BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new 
MockSleeper(),
+          InsertRetryPolicy.alwaysRetry(), null);
       fail();
     } catch (RuntimeException e) {
       verify(response, times(1)).getStatusCode();
@@ -635,6 +649,56 @@ public class BigQueryServicesImplTest {
     }
   }
 
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link 
InsertRetryPolicy},
+   * and returns the list of rows not retried.
+   */
+  @Test
+  public void testInsertRetryPolicy() throws InterruptedException, IOException 
{
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of(
+        wrapTableRow(new TableRow()), wrapTableRow(new TableRow()));
+
+    // First time row0 fails with a retryable error, and row1 fails with a 
persistent error.
+    final TableDataInsertAllResponse firstFailure = new 
TableDataInsertAllResponse()
+        .setInsertErrors(ImmutableList.of(
+            new InsertErrors().setIndex(0L).setErrors(
+                ImmutableList.of(new ErrorProto().setReason("timeout"))),
+            new InsertErrors().setIndex(1L).setErrors(
+            ImmutableList.of(new ErrorProto().setReason("invalid")))));
+
+    // Second time there is only one row, which fails with a retryable error.
+    final TableDataInsertAllResponse secondFialure = new 
TableDataInsertAllResponse()
+        .setInsertErrors(ImmutableList.of(new 
InsertErrors().setIndex(0L).setErrors(
+            ImmutableList.of(new ErrorProto().setReason("timeout")))));
+
+    // On the final attempt, no failures are returned.
+    final TableDataInsertAllResponse allRowsSucceeded = new 
TableDataInsertAllResponse();
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    // Always return 200.
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200).thenReturn(200);
+
+    // First fail
+    when(response.getContent())
+        .thenReturn(toStream(firstFailure))
+        .thenReturn(toStream(secondFialure))
+        .thenReturn(toStream(allRowsSucceeded));
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
+    dataService.insertAll(ref, rows, null,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(),
+        InsertRetryPolicy.retryTransientErrors(), failedInserts);
+    assertEquals(1, failedInserts.size());
+    expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+  }
+
   /** A helper to wrap a {@link GenericJson} object in a content stream. */
   private static InputStream toStream(GenericJson content) throws IOException {
     return new 
ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index fa84119..43290dc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -49,6 +49,9 @@ import java.util.List;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Assert;
@@ -391,16 +394,18 @@ public class BigQueryUtilTest {
         .parseTableSpec("project:dataset.table");
     DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, 
options, 5);
 
-    List<TableRow> rows = new ArrayList<>();
+    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
     List<String> ids = new ArrayList<>();
     for (int i = 0; i < 25; ++i) {
-      rows.add(rawRow("foo", 1234));
+      rows.add(ValueInSingleWindow.of(rawRow("foo", 1234), 
GlobalWindow.TIMESTAMP_MAX_VALUE,
+          GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING));
       ids.add(new String());
     }
 
     long totalBytes = 0;
     try {
-      totalBytes = datasetService.insertAll(ref, rows, ids);
+      totalBytes = datasetService.insertAll(ref, rows, ids, 
InsertRetryPolicy.alwaysRetry(),
+          null);
     } finally {
       verifyInsertAll(5);
       // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 5103adb..6ee5340 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -25,9 +25,11 @@ import com.google.api.client.http.HttpHeaders;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -36,9 +38,15 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** A fake dataset service that can be serialized, for use in 
testReadFromTable. */
 class FakeDatasetService implements DatasetService, Serializable {
+  Map<String, List<String>> insertErrors = Maps.newHashMap();
+
   @Override
   public Table getTable(TableReference tableRef)
       throws InterruptedException, IOException {
@@ -164,10 +172,24 @@ class FakeDatasetService implements DatasetService, 
Serializable {
     }
   }
 
+  public long insertAll(TableReference ref, List<TableRow> rowList,
+                        @Nullable List<String> insertIdList)
+      throws IOException, InterruptedException {
+    List<ValueInSingleWindow<TableRow>> windowedRows = Lists.newArrayList();
+    for (TableRow row : rowList) {
+      windowedRows.add(ValueInSingleWindow.of(row, 
GlobalWindow.TIMESTAMP_MAX_VALUE,
+          GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    return insertAll(ref, windowedRows, insertIdList, 
InsertRetryPolicy.alwaysRetry(), null);
+  }
+
   @Override
   public long insertAll(
-      TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
+      TableReference ref, List<ValueInSingleWindow<TableRow>> rowList,
+      @Nullable List<String> insertIdList,
+      InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<TableRow>> 
failedInserts)
       throws IOException, InterruptedException {
+    Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> insertErrors 
= getInsertErrors();
     synchronized (BigQueryIOTest.tables) {
       if (insertIdList != null) {
         assertEquals(rowList.size(), insertIdList.size());
@@ -182,7 +204,21 @@ class FakeDatasetService implements DatasetService, 
Serializable {
       TableContainer tableContainer = getTableContainer(
           ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
       for (int i = 0; i < rowList.size(); ++i) {
-        dataSize += tableContainer.addRow(rowList.get(i), insertIdList.get(i));
+        TableRow row = rowList.get(i).getValue();
+        List<TableDataInsertAllResponse.InsertErrors> allErrors = 
insertErrors.get(row);
+        boolean shouldInsert = true;
+        if (allErrors != null) {
+          for (TableDataInsertAllResponse.InsertErrors errors : allErrors) {
+            if (!retryPolicy.shouldRetry(new Context(errors))) {
+              shouldInsert = false;
+            }
+          }
+        }
+        if (shouldInsert) {
+          dataSize += tableContainer.addRow(row, insertIdList.get(i));
+        } else {
+          failedInserts.add(rowList.get(i));
+        }
       }
       return dataSize;
     }
@@ -200,6 +236,41 @@ class FakeDatasetService implements DatasetService, 
Serializable {
     }
   }
 
+  /**
+   * Cause a given {@link TableRow} object to fail when it's inserted. The 
errors link the list
+   * will be returned on subsequent retries, and the insert will succeed when 
the errors run out.
+   */
+  public void failOnInsert(
+      Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> 
insertErrors) {
+    synchronized (BigQueryIOTest.tables) {
+      for (Map.Entry<TableRow, List<TableDataInsertAllResponse.InsertErrors>> 
entry
+          : insertErrors.entrySet()) {
+        List<String> errorStrings = Lists.newArrayList();
+        for (TableDataInsertAllResponse.InsertErrors errors : 
entry.getValue()) {
+          errorStrings.add(BigQueryHelpers.toJsonString(errors));
+        }
+        this.insertErrors.put(BigQueryHelpers.toJsonString(entry.getKey()), 
errorStrings);
+      }
+    }
+  }
+
+  Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> 
getInsertErrors() {
+    Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> 
parsedInsertErrors =
+        Maps.newHashMap();
+    synchronized (BigQueryIOTest.tables) {
+      for (Map.Entry<String, List<String>> entry : 
this.insertErrors.entrySet()) {
+        TableRow tableRow = BigQueryHelpers.fromJsonString(entry.getKey(), 
TableRow.class);
+        List<TableDataInsertAllResponse.InsertErrors> allErrors = 
Lists.newArrayList();
+        for (String errorsString : entry.getValue()) {
+          allErrors.add(BigQueryHelpers.fromJsonString(
+              errorsString, TableDataInsertAllResponse.InsertErrors.class));
+        }
+        parsedInsertErrors.put(tableRow, allErrors);
+      }
+    }
+    return parsedInsertErrors;
+  }
+
   void throwNotFound(String format, Object... args) throws IOException {
     throw new IOException(
         new GoogleJsonResponseException.Builder(404,

http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java
new file mode 100644
index 0000000..b19835d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InsertRetryPolicy}.
+ */
+@RunWith(JUnit4.class)
+public class InsertRetryPolicyTest {
+  @Test
+  public void testNeverRetry() {
+    assertFalse(InsertRetryPolicy.neverRetry().shouldRetry(
+        new Context(new TableDataInsertAllResponse.InsertErrors())));
+  }
+
+  @Test
+  public void testAlwaysRetry() {
+    assertTrue(InsertRetryPolicy.alwaysRetry().shouldRetry(
+        new Context(new TableDataInsertAllResponse.InsertErrors())));
+  }
+
+  @Test
+  public void testDontRetryPersistentErrors() {
+    InsertRetryPolicy policy = InsertRetryPolicy.retryTransientErrors();
+    assertTrue(policy.shouldRetry(new Context(generateErrorAmongMany(
+        5, "timeout", "unavailable"))));
+    assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany(
+        5, "timeout", "invalid"))));
+    assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany(
+        5, "timeout", "invalidQuery"))));
+    assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany(
+        5, "timeout", "notImplemented"))));
+  }
+
+  private TableDataInsertAllResponse.InsertErrors generateErrorAmongMany(
+      int numErrors, String baseReason, String exceptionalReason) {
+    // The retry policies are expected to search through the entire list of 
ErrorProtos to determine
+    // whether to retry. Stick the exceptionalReason in a random position to 
exercise this.
+    List<ErrorProto> errorProtos = 
Lists.newArrayListWithExpectedSize(numErrors);
+    int exceptionalPosition = ThreadLocalRandom.current().nextInt(numErrors);
+    for (int i = 0; i < numErrors; ++i) {
+      ErrorProto error = new ErrorProto();
+      error.setReason((i == exceptionalPosition) ? exceptionalReason : 
baseReason);
+      errorProtos.add(error);
+    }
+    TableDataInsertAllResponse.InsertErrors errors = new 
TableDataInsertAllResponse.InsertErrors();
+    errors.setErrors(errorProtos);
+    return errors;
+  }
+}

Reply via email to