This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9dc0f  Merge pull request #13055 from [BEAM-11006] BigQuery failsafe 
function
8b9dc0f is described below

commit 8b9dc0fef3a541a3502b5dba812da2cb05ab4a5b
Author: Dylan Hercher <[email protected]>
AuthorDate: Fri Oct 16 20:20:52 2020 -0700

    Merge pull request #13055 from [BEAM-11006] BigQuery failsafe function
    
    * Adding a Failsafe format function to BigQuery streaming writes
    
    * [BEAM-11006] Adding a Failsafe format function to BigQuery streaming 
writes
    
    * make FailsafeValueInSingleWindow public
    
    * missed failsafe
    
    * match BigQueryIO row writer T
    
    * clean extra PrepareWrite
    
    * linting cleanup
    
    * object already defined
    
    * linting
    
    * fixing testing to match new falsafe logic
    
    * cleaninng tests to use failsafe logic
    
    * remove todo
    
    * use proper Failsafe value object
    
    * lintng
    
    * format FailsafeValueInSingleWindow
    
    * fix test to validate on correct expected values
    
    * implement coder logic for failsafe element
    
    * clean up todo
    
    * cleanup todo
    
    * spotless
    
    * spotless
    
    * adding failsafevalueinsinglewindow testing
    
    * adding tests for failsafe
    
    * removing final todo, will add feature in separate PR/issue
    
    * change name to FormatRecordOnFailureFunction
    
    * linting
    
    * linting
    
    Co-authored-by: Dylan Hercher <[email protected]>
---
 .../sdk/values/FailsafeValueInSingleWindow.java    | 142 +++++++++++++++++++++
 .../FailsafeValueInSingleWindowCoderTest.java      |  64 ++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  69 +++++-----
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   3 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   9 +-
 .../beam/sdk/io/gcp/bigquery/ErrorContainer.java   |  11 +-
 .../beam/sdk/io/gcp/bigquery/RowWriterFactory.java |  18 ++-
 .../beam/sdk/io/gcp/bigquery/StreamingInserts.java |  17 ++-
 .../beam/sdk/io/gcp/bigquery/StreamingWriteFn.java |  18 ++-
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |  49 +++++--
 .../sdk/io/gcp/testing/FakeDatasetService.java     |  10 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |   3 +-
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  |  42 ++++--
 .../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java |   9 +-
 14 files changed, 379 insertions(+), 85 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
new file mode 100644
index 0000000..9a1a8f9
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * An immutable tuple of value, timestamp, window, and pane.
+ *
+ * @param <T> the type of the value
+ */
+@AutoValue
+@Internal
+public abstract class FailsafeValueInSingleWindow<T, ErrorT> {
+  /** Returns the value of this {@code FailsafeValueInSingleWindow}. */
+  public abstract @Nullable T getValue();
+
+  /** Returns the timestamp of this {@code FailsafeValueInSingleWindow}. */
+  public abstract Instant getTimestamp();
+
+  /** Returns the window of this {@code FailsafeValueInSingleWindow}. */
+  public abstract BoundedWindow getWindow();
+
+  /** Returns the pane of this {@code FailsafeValueInSingleWindow} in its 
window. */
+  public abstract PaneInfo getPane();
+
+  /** Returns the failsafe value of this {@code FailsafeValueInSingleWindow}. 
*/
+  public abstract @Nullable ErrorT getFailsafeValue();
+
+  public static <T, ErrorT> FailsafeValueInSingleWindow<T, ErrorT> of(
+      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, 
ErrorT failsafeValue) {
+    return new AutoValue_FailsafeValueInSingleWindow<>(
+        value, timestamp, window, paneInfo, failsafeValue);
+  }
+
+  /** A coder for {@link FailsafeValueInSingleWindow}. */
+  public static class Coder<T, ErrorT>
+      extends StructuredCoder<FailsafeValueInSingleWindow<T, ErrorT>> {
+    private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
+    private final org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder;
+    private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
+
+    public static <T, ErrorT> Coder<T, ErrorT> of(
+        org.apache.beam.sdk.coders.Coder<T> valueCoder,
+        org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      return new Coder<>(valueCoder, failsafeValueCoder, windowCoder);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    Coder(
+        org.apache.beam.sdk.coders.Coder<T> valueCoder,
+        org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      this.valueCoder = valueCoder;
+      this.failsafeValueCoder = failsafeValueCoder;
+      this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
+    }
+
+    @Override
+    public void encode(FailsafeValueInSingleWindow<T, ErrorT> windowedElem, 
OutputStream outStream)
+        throws IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
+    public void encode(
+        FailsafeValueInSingleWindow<T, ErrorT> windowedElem,
+        OutputStream outStream,
+        Context context)
+        throws IOException {
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+      windowCoder.encode(windowedElem.getWindow(), outStream);
+      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), 
outStream);
+      valueCoder.encode(windowedElem.getValue(), outStream);
+      failsafeValueCoder.encode(windowedElem.getFailsafeValue(), outStream);
+    }
+
+    @Override
+    public FailsafeValueInSingleWindow<T, ErrorT> decode(InputStream inStream) 
throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
+    public FailsafeValueInSingleWindow<T, ErrorT> decode(InputStream inStream, 
Context context)
+        throws IOException {
+      Instant timestamp = InstantCoder.of().decode(inStream);
+      BoundedWindow window = windowCoder.decode(inStream);
+      PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
+      T value = valueCoder.decode(inStream);
+      ErrorT failsafeValue = failsafeValueCoder.decode(inStream);
+      return new AutoValue_FailsafeValueInSingleWindow<>(
+          value, timestamp, window, paneInfo, failsafeValue);
+    }
+
+    @Override
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> 
getCoderArguments() {
+      // Coder arguments are coders for the type parameters of the coder - 
i.e. T and ErrorT
+      return ImmutableList.of(valueCoder, failsafeValueCoder);
+    }
+
+    @Override
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() 
{
+      // Coder components are all inner coders that it uses - i.e. both T, 
ErrorT and BoundedWindow.
+      return ImmutableList.of(valueCoder, failsafeValueCoder, windowCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      valueCoder.verifyDeterministic();
+      failsafeValueCoder.verifyDeterministic();
+      windowCoder.verifyDeterministic();
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
new file mode 100644
index 0000000..4c8237b
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FailsafeValueInSingleWindow.Coder}. */
+@RunWith(JUnit4.class)
+public class FailsafeValueInSingleWindowCoderTest {
+  @Test
+  public void testDecodeEncodeEqual() throws Exception {
+    Instant now = Instant.now();
+    FailsafeValueInSingleWindow<String, String> value =
+        FailsafeValueInSingleWindow.of(
+            "foo",
+            now,
+            new IntervalWindow(now, now.plus(Duration.standardSeconds(10))),
+            PaneInfo.NO_FIRING,
+            "bar");
+
+    CoderProperties.coderDecodeEncodeEqual(
+        FailsafeValueInSingleWindow.Coder.of(
+            StringUtf8Coder.of(), StringUtf8Coder.of(), 
IntervalWindow.getCoder()),
+        value);
+  }
+
+  @Test
+  public void testCoderSerializable() throws Exception {
+    CoderProperties.coderSerializable(
+        FailsafeValueInSingleWindow.Coder.of(
+            StringUtf8Coder.of(), StringUtf8Coder.of(), 
IntervalWindow.getCoder()));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(
+        FailsafeValueInSingleWindow.Coder.of(
+            GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE, 
GlobalWindow.Coder.INSTANCE));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2f22af3..36b03ec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1723,6 +1723,8 @@ public class BigQueryIO {
 
     abstract @Nullable SerializableFunction<T, TableRow> getFormatFunction();
 
+    abstract @Nullable SerializableFunction<T, TableRow> 
getFormatRecordOnFailureFunction();
+
     abstract RowWriterFactory.@Nullable AvroRowWriterFactory<T, ?, ?> 
getAvroRowWriterFactory();
 
     abstract @Nullable SerializableFunction<TableSchema, 
org.apache.avro.Schema>
@@ -1798,6 +1800,9 @@ public class BigQueryIO {
 
       abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> 
formatFunction);
 
+      abstract Builder<T> setFormatRecordOnFailureFunction(
+          SerializableFunction<T, TableRow> formatFunction);
+
       abstract Builder<T> setAvroRowWriterFactory(
           RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
 
@@ -2003,6 +2008,16 @@ public class BigQueryIO {
     }
 
     /**
+     * If an insert failure occurs, this function is applied to the originally 
supplied row T. The
+     * resulting {@link TableRow} will be accessed via {@link
+     * WriteResult#getFailedInsertsWithErr()}.
+     */
+    public Write<T> withFormatRecordOnFailureFunction(
+        SerializableFunction<T, TableRow> formatFunction) {
+      return 
toBuilder().setFormatRecordOnFailureFunction(formatFunction).build();
+    }
+
+    /**
      * Formats the user's type into a {@link GenericRecord} to be written to 
BigQuery. The
      * GenericRecords are written as avro using the standard {@link 
GenericDatumWriter}.
      *
@@ -2521,6 +2536,8 @@ public class BigQueryIO {
         PCollection<T> input, DynamicDestinations<T, DestinationT> 
dynamicDestinations) {
       boolean optimizeWrites = getOptimizeWrites();
       SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
+      SerializableFunction<T, TableRow> formatRecordOnFailureFunction =
+          getFormatRecordOnFailureFunction();
       RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> 
avroRowWriterFactory =
           (RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) 
getAvroRowWriterFactory();
 
@@ -2566,8 +2583,8 @@ public class BigQueryIO {
       }
 
       Method method = resolveMethod(input);
+      RowWriterFactory<T, DestinationT> rowWriterFactory;
       if (optimizeWrites) {
-        RowWriterFactory<T, DestinationT> rowWriterFactory;
         if (avroRowWriterFactory != null) {
           checkArgument(
               formatFunction == null,
@@ -2584,7 +2601,8 @@ public class BigQueryIO {
           }
           rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations, 
avroSchemaFactory);
         } else if (formatFunction != null) {
-          rowWriterFactory = RowWriterFactory.tableRows(formatFunction);
+          rowWriterFactory =
+              RowWriterFactory.tableRows(formatFunction, 
formatRecordOnFailureFunction);
         } else {
           throw new IllegalArgumentException(
               "A function must be provided to convert the input type into a 
TableRow or "
@@ -2592,20 +2610,6 @@ public class BigQueryIO {
                   + "BigQueryIO.Write.withAvroFormatFunction to provide a 
formatting function. "
                   + "A format function is not required if Beam schemas are 
used.");
         }
-
-        PCollection<KV<DestinationT, T>> rowsWithDestination =
-            input
-                .apply(
-                    "PrepareWrite",
-                    new PrepareWrite<>(dynamicDestinations, 
SerializableFunctions.identity()))
-                .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
-        return continueExpandTyped(
-            rowsWithDestination,
-            input.getCoder(),
-            destinationCoder,
-            dynamicDestinations,
-            rowWriterFactory,
-            method);
       } else {
         checkArgument(avroRowWriterFactory == null);
         checkArgument(
@@ -2615,22 +2619,22 @@ public class BigQueryIO {
                 + "BigQueryIO.Write.withAvroFormatFunction to provide a 
formatting function. "
                 + "A format function is not required if Beam schemas are 
used.");
 
-        PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
-            input
-                .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, 
formatFunction))
-                .setCoder(KvCoder.of(destinationCoder, 
TableRowJsonCoder.of()));
-
-        RowWriterFactory<TableRow, DestinationT> rowWriterFactory =
-            RowWriterFactory.tableRows(SerializableFunctions.identity());
-
-        return continueExpandTyped(
-            rowsWithDestination,
-            TableRowJsonCoder.of(),
-            destinationCoder,
-            dynamicDestinations,
-            rowWriterFactory,
-            method);
+        rowWriterFactory =
+            RowWriterFactory.tableRows(formatFunction, 
formatRecordOnFailureFunction);
       }
+      PCollection<KV<DestinationT, T>> rowsWithDestination =
+          input
+              .apply(
+                  "PrepareWrite",
+                  new PrepareWrite<>(dynamicDestinations, 
SerializableFunctions.identity()))
+              .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
+      return continueExpandTyped(
+          rowsWithDestination,
+          input.getCoder(),
+          destinationCoder,
+          dynamicDestinations,
+          rowWriterFactory,
+          method);
     }
 
     private <DestinationT, ElementT> WriteResult continueExpandTyped(
@@ -2659,7 +2663,8 @@ public class BigQueryIO {
                     getCreateDisposition(),
                     dynamicDestinations,
                     elementCoder,
-                    tableRowWriterFactory.getToRowFn())
+                    tableRowWriterFactory.getToRowFn(),
+                    tableRowWriterFactory.getToFailsafeRowFn())
                 .withInsertRetryPolicy(retryPolicy)
                 .withTestServices(getBigQueryServices())
                 .withExtendedErrorInfo(getExtendedErrorInfo())
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 12f0029..f6894ca 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.util.Histogram;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -154,7 +155,7 @@ public interface BigQueryServices extends Serializable {
      */
     <T> long insertAll(
         TableReference ref,
-        List<ValueInSingleWindow<TableRow>> rowList,
+        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
         @Nullable List<String> insertIdList,
         InsertRetryPolicy retryPolicy,
         List<ValueInSingleWindow<T>> failedInserts,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5935a76..bff771f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Histogram;
 import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -752,7 +753,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     @VisibleForTesting
     <T> long insertAll(
         TableReference ref,
-        List<ValueInSingleWindow<TableRow>> rowList,
+        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
         @Nullable List<String> insertIdList,
         BackOff backoff,
         FluentBackoff rateLimitBackoffFactory,
@@ -781,13 +782,13 @@ class BigQueryServicesImpl implements BigQueryServices {
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new 
ArrayList<>();
       // These lists contain the rows to publish. Initially the contain the 
entire list.
       // If there are failures, they will contain only the failed rows to be 
retried.
-      List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
+      List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish = 
rowList;
       List<String> idsToPublish = null;
       if (!ignoreInsertIds) {
         idsToPublish = insertIdList;
       }
       while (true) {
-        List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();
+        List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new 
ArrayList<>();
         List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : 
null;
 
         int strideIndex = 0;
@@ -944,7 +945,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public <T> long insertAll(
         TableReference ref,
-        List<ValueInSingleWindow<TableRow>> rowList,
+        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
         @Nullable List<String> insertIdList,
         InsertRetryPolicy retryPolicy,
         List<ValueInSingleWindow<T>> failedInserts,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
index 8b05bd0..7674d68 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
@@ -34,14 +35,18 @@ public interface ErrorContainer<T> extends Serializable {
       List<ValueInSingleWindow<T>> failedInserts,
       TableDataInsertAllResponse.InsertErrors error,
       TableReference ref,
-      ValueInSingleWindow<TableRow> tableRow);
+      FailsafeValueInSingleWindow<TableRow, TableRow> tableRow);
 
   ErrorContainer<TableRow> TABLE_ROW_ERROR_CONTAINER =
-      (failedInserts, error, ref, tableRow) -> failedInserts.add(tableRow);
+      (failedInserts, error, ref, tableRow) ->
+          failedInserts.add(
+              ValueInSingleWindow.of(
+                  tableRow.getFailsafeValue(), tableRow.getTimestamp(),
+                  tableRow.getWindow(), tableRow.getPane()));
 
   ErrorContainer<BigQueryInsertError> BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER =
       (failedInserts, error, ref, tableRow) -> {
-        BigQueryInsertError err = new BigQueryInsertError(tableRow.getValue(), 
error, ref);
+        BigQueryInsertError err = new 
BigQueryInsertError(tableRow.getFailsafeValue(), error, ref);
         failedInserts.add(
             ValueInSingleWindow.of(
                 err, tableRow.getTimestamp(), tableRow.getWindow(), 
tableRow.getPane()));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
index 7229957..a205b3c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
@@ -40,23 +40,35 @@ abstract class RowWriterFactory<ElementT, DestinationT> 
implements Serializable
       String tempFilePrefix, DestinationT destination) throws Exception;
 
   static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT> 
tableRows(
-      SerializableFunction<ElementT, TableRow> toRow) {
-    return new TableRowWriterFactory<ElementT, DestinationT>(toRow);
+      SerializableFunction<ElementT, TableRow> toRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeRow) {
+    return new TableRowWriterFactory<ElementT, DestinationT>(toRow, 
toFailsafeRow);
   }
 
   static final class TableRowWriterFactory<ElementT, DestinationT>
       extends RowWriterFactory<ElementT, DestinationT> {
 
     private final SerializableFunction<ElementT, TableRow> toRow;
+    private final SerializableFunction<ElementT, TableRow> toFailsafeRow;
 
-    private TableRowWriterFactory(SerializableFunction<ElementT, TableRow> 
toRow) {
+    private TableRowWriterFactory(
+        SerializableFunction<ElementT, TableRow> toRow,
+        SerializableFunction<ElementT, TableRow> toFailsafeRow) {
       this.toRow = toRow;
+      this.toFailsafeRow = toFailsafeRow;
     }
 
     public SerializableFunction<ElementT, TableRow> getToRowFn() {
       return toRow;
     }
 
+    public SerializableFunction<ElementT, TableRow> getToFailsafeRowFn() {
+      if (toFailsafeRow == null) {
+        return toRow;
+      }
+      return toFailsafeRow;
+    }
+
     @Override
     public OutputType getOutputType() {
       return OutputType.JsonTableRow;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index d00adbb..22415a6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -42,13 +42,15 @@ public class StreamingInserts<DestinationT, ElementT>
   private final String kmsKey;
   private final Coder<ElementT> elementCoder;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
+  private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
 
   /** Constructor. */
   public StreamingInserts(
       CreateDisposition createDisposition,
       DynamicDestinations<?, DestinationT> dynamicDestinations,
       Coder<ElementT> elementCoder,
-      SerializableFunction<ElementT, TableRow> toTableRow) {
+      SerializableFunction<ElementT, TableRow> toTableRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
     this(
         createDisposition,
         dynamicDestinations,
@@ -60,6 +62,7 @@ public class StreamingInserts<DestinationT, ElementT>
         false,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         null);
   }
 
@@ -75,6 +78,7 @@ public class StreamingInserts<DestinationT, ElementT>
       boolean ignoreInsertIds,
       Coder<ElementT> elementCoder,
       SerializableFunction<ElementT, TableRow> toTableRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
       String kmsKey) {
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
@@ -86,6 +90,7 @@ public class StreamingInserts<DestinationT, ElementT>
     this.ignoreInsertIds = ignoreInsertIds;
     this.elementCoder = elementCoder;
     this.toTableRow = toTableRow;
+    this.toFailsafeTableRow = toFailsafeTableRow;
     this.kmsKey = kmsKey;
   }
 
@@ -103,6 +108,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -119,6 +125,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -134,6 +141,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -149,6 +157,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -164,6 +173,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -179,6 +189,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -194,6 +205,7 @@ public class StreamingInserts<DestinationT, ElementT>
         ignoreInsertIds,
         elementCoder,
         toTableRow,
+        toFailsafeTableRow,
         kmsKey);
   }
 
@@ -215,6 +227,7 @@ public class StreamingInserts<DestinationT, ElementT>
             .withIgnoreUnknownValues(ignoreUnknownValues)
             .withIgnoreInsertIds(ignoreInsertIds)
             .withElementCoder(elementCoder)
-            .withToTableRow(toTableRow));
+            .withToTableRow(toTableRow)
+            .withToFailsafeTableRow(toFailsafeTableRow));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index a956f0d..95e6467 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Histogram;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
@@ -58,9 +59,10 @@ class StreamingWriteFn<ErrorT, ElementT>
   private final boolean ignoreUnknownValues;
   private final boolean ignoreInsertIds;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
+  private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
 
   /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
-  private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
+  private transient Map<String, List<FailsafeValueInSingleWindow<TableRow, 
TableRow>>> tableRows;
 
   /** The list of unique ids for each BigQuery table row. */
   private transient Map<String, List<String>> uniqueIdsForTableRows;
@@ -79,7 +81,8 @@ class StreamingWriteFn<ErrorT, ElementT>
       boolean skipInvalidRows,
       boolean ignoreUnknownValues,
       boolean ignoreInsertIds,
-      SerializableFunction<ElementT, TableRow> toTableRow) {
+      SerializableFunction<ElementT, TableRow> toTableRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
     this.bqServices = bqServices;
     this.retryPolicy = retryPolicy;
     this.failedOutputTag = failedOutputTag;
@@ -88,6 +91,7 @@ class StreamingWriteFn<ErrorT, ElementT>
     this.ignoreUnknownValues = ignoreUnknownValues;
     this.ignoreInsertIds = ignoreInsertIds;
     this.toTableRow = toTableRow;
+    this.toFailsafeTableRow = toFailsafeTableRow;
   }
 
   @Setup
@@ -120,13 +124,14 @@ class StreamingWriteFn<ErrorT, ElementT>
       BoundedWindow window,
       PaneInfo pane) {
     String tableSpec = element.getKey().getKey();
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
     List<String> uniqueIds =
         BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, 
tableSpec);
 
     TableRow tableRow = toTableRow.apply(element.getValue().tableRow);
-    rows.add(ValueInSingleWindow.of(tableRow, timestamp, window, pane));
+    TableRow failsafeTableRow = 
toFailsafeTableRow.apply(element.getValue().tableRow);
+    rows.add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane, 
failsafeTableRow));
     uniqueIds.add(element.getValue().uniqueId);
   }
 
@@ -135,7 +140,8 @@ class StreamingWriteFn<ErrorT, ElementT>
   public void finishBundle(FinishBundleContext context) throws Exception {
     List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
     BigQueryOptions options = 
context.getPipelineOptions().as(BigQueryOptions.class);
-    for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry : 
tableRows.entrySet()) {
+    for (Map.Entry<String, List<FailsafeValueInSingleWindow<TableRow, 
TableRow>>> entry :
+        tableRows.entrySet()) {
       TableReference tableReference = 
BigQueryHelpers.parseTableSpec(entry.getKey());
       flushRows(
           tableReference,
@@ -173,7 +179,7 @@ class StreamingWriteFn<ErrorT, ElementT>
   /** Writes the accumulated rows into BigQuery with streaming API. */
   private void flushRows(
       TableReference tableReference,
-      List<ValueInSingleWindow<TableRow>> tableRows,
+      List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows,
       List<String> uniqueIds,
       BigQueryOptions options,
       List<ValueInSingleWindow<ErrorT>> failedInserts)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 81f097a..5940415 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -56,6 +56,7 @@ public class StreamingWriteTables<ElementT>
   private final boolean ignoreInsertIds;
   private final Coder<ElementT> elementCoder;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
+  private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
 
   public StreamingWriteTables() {
     this(
@@ -66,7 +67,8 @@ public class StreamingWriteTables<ElementT>
         false, // ignoreUnknownValues
         false, // ignoreInsertIds
         null, // elementCoder
-        null); // toTableRow
+        null, // toTableRow
+        null); // toFailsafeTableRow
   }
 
   private StreamingWriteTables(
@@ -77,7 +79,8 @@ public class StreamingWriteTables<ElementT>
       boolean ignoreUnknownValues,
       boolean ignoreInsertIds,
       Coder<ElementT> elementCoder,
-      SerializableFunction<ElementT, TableRow> toTableRow) {
+      SerializableFunction<ElementT, TableRow> toTableRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
     this.bigQueryServices = bigQueryServices;
     this.retryPolicy = retryPolicy;
     this.extendedErrorInfo = extendedErrorInfo;
@@ -86,6 +89,7 @@ public class StreamingWriteTables<ElementT>
     this.ignoreInsertIds = ignoreInsertIds;
     this.elementCoder = elementCoder;
     this.toTableRow = toTableRow;
+    this.toFailsafeTableRow = toFailsafeTableRow;
   }
 
   StreamingWriteTables<ElementT> withTestServices(BigQueryServices 
bigQueryServices) {
@@ -97,7 +101,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy 
retryPolicy) {
@@ -109,7 +114,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean 
extendedErrorInfo) {
@@ -121,7 +127,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withSkipInvalidRows(boolean skipInvalidRows) {
@@ -133,7 +140,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean 
ignoreUnknownValues) {
@@ -145,7 +153,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean ignoreInsertIds) {
@@ -157,7 +166,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT> 
elementCoder) {
@@ -169,7 +179,8 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   StreamingWriteTables<ElementT> withToTableRow(
@@ -182,7 +193,22 @@ public class StreamingWriteTables<ElementT>
         ignoreUnknownValues,
         ignoreInsertIds,
         elementCoder,
-        toTableRow);
+        toTableRow,
+        toFailsafeTableRow);
+  }
+
+  StreamingWriteTables<ElementT> withToFailsafeTableRow(
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
+    return new StreamingWriteTables<>(
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues,
+        ignoreInsertIds,
+        elementCoder,
+        toTableRow,
+        toFailsafeTableRow);
   }
 
   @Override
@@ -264,7 +290,8 @@ public class StreamingWriteTables<ElementT>
                             skipInvalidRows,
                             ignoreUnknownValues,
                             ignoreInsertIds,
-                            toTableRow))
+                            toTableRow,
+                            toFailsafeTableRow))
                     .withOutputTags(mainOutputTag, 
TupleTagList.of(failedInsertsTag)));
     PCollection<T> failedInserts = tuple.get(failedInsertsTag);
     failedInserts.setCoder(coder);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 963edb4..149e068 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -213,14 +214,15 @@ public class FakeDatasetService implements 
DatasetService, Serializable {
   public long insertAll(
       TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
       throws IOException, InterruptedException {
-    List<ValueInSingleWindow<TableRow>> windowedRows = Lists.newArrayList();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> windowedRows = 
Lists.newArrayList();
     for (TableRow row : rowList) {
       windowedRows.add(
-          ValueInSingleWindow.of(
+          FailsafeValueInSingleWindow.of(
               row,
               GlobalWindow.TIMESTAMP_MAX_VALUE,
               GlobalWindow.INSTANCE,
-              PaneInfo.ON_TIME_AND_ONLY_FIRING));
+              PaneInfo.ON_TIME_AND_ONLY_FIRING,
+              row));
     }
     return insertAll(
         ref,
@@ -237,7 +239,7 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
   @Override
   public <T> long insertAll(
       TableReference ref,
-      List<ValueInSingleWindow<TableRow>> rowList,
+      List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
       @Nullable List<String> insertIdList,
       InsertRetryPolicy retryPolicy,
       List<ValueInSingleWindow<T>> failedInserts,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 4489139..d22b04e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1556,7 +1556,8 @@ public class BigQueryIOWriteTest implements Serializable {
             BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION,
             multiPartitionsTag,
             singlePartitionTag,
-            RowWriterFactory.tableRows(SerializableFunctions.identity()));
+            RowWriterFactory.tableRows(
+                SerializableFunctions.identity(), 
SerializableFunctions.identity()));
 
     DoFnTester<
             Iterable<WriteBundlesToFiles.Result<TableDestination>>,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 15bfc0a..a27cdbb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -483,7 +484,16 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getContentType();
   }
 
-  private <T> ValueInSingleWindow<T> wrapValue(T value) {
+  private <T> FailsafeValueInSingleWindow<T, T> wrapValue(T value) {
+    return FailsafeValueInSingleWindow.of(
+        value,
+        GlobalWindow.TIMESTAMP_MAX_VALUE,
+        GlobalWindow.INSTANCE,
+        PaneInfo.ON_TIME_AND_ONLY_FIRING,
+        value);
+  }
+
+  private <T> ValueInSingleWindow<T> wrapErrorValue(T value) {
     return ValueInSingleWindow.of(
         value,
         GlobalWindow.TIMESTAMP_MAX_VALUE,
@@ -496,7 +506,7 @@ public class BigQueryServicesImplTest {
   public void testInsertRateLimitRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
     // First response is 403 rate limited, second response has valid payload.
@@ -532,7 +542,7 @@ public class BigQueryServicesImplTest {
   public void testInsertQuotaExceededRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
     // First response is 403 quota exceeded, second response has valid payload.
@@ -568,7 +578,7 @@ public class BigQueryServicesImplTest {
   public void testInsertStoppedRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
     // Respond 403 four times, then valid payload.
@@ -620,7 +630,7 @@ public class BigQueryServicesImplTest {
   public void testInsertRetrySelectRows() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(
             wrapValue(new TableRow().set("row", "a")), wrapValue(new 
TableRow().set("row", "b")));
     List<String> insertIds = ImmutableList.of("a", "b");
@@ -664,7 +674,7 @@ public class BigQueryServicesImplTest {
   public void testInsertFailsGracefully() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
 
     final TableDataInsertAllResponse row1Failed =
@@ -723,7 +733,7 @@ public class BigQueryServicesImplTest {
   public void testFailInsertOtherRetry() throws Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
     // First response is 403 non-{rate-limited, quota-exceeded}, second 
response has valid payload
@@ -767,7 +777,7 @@ public class BigQueryServicesImplTest {
   public void testInsertRetryPolicy() throws InterruptedException, IOException 
{
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
 
     // First time row0 fails with a retryable error, and row1 fails with a 
persistent error.
@@ -836,7 +846,7 @@ public class BigQueryServicesImplTest {
       throws InterruptedException, IOException {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
 
     final TableDataInsertAllResponse allRowsSucceeded = new 
TableDataInsertAllResponse();
@@ -1051,10 +1061,14 @@ public class BigQueryServicesImplTest {
   public void testSimpleErrorRetrieval() throws InterruptedException, 
IOException {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(
             wrapValue(new TableRow().set("a", 1)), wrapValue(new 
TableRow().set("b", 2)));
 
+    final List<ValueInSingleWindow<TableRow>> expected =
+        ImmutableList.of(
+            wrapErrorValue(new TableRow().set("a", 1)), wrapErrorValue(new 
TableRow().set("b", 2)));
+
     final TableDataInsertAllResponse failures =
         new TableDataInsertAllResponse()
             .setInsertErrors(
@@ -1090,7 +1104,7 @@ public class BigQueryServicesImplTest {
         false,
         false);
 
-    assertThat(failedInserts, is(rows));
+    assertThat(failedInserts, is(expected));
   }
 
   /** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link 
ErrorContainer}. */
@@ -1098,7 +1112,7 @@ public class BigQueryServicesImplTest {
   public void testExtendedErrorRetrieval() throws InterruptedException, 
IOException {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<ValueInSingleWindow<TableRow>> rows =
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(
             wrapValue(new TableRow().set("a", 1)), wrapValue(new 
TableRow().set("b", 2)));
 
@@ -1115,10 +1129,10 @@ public class BigQueryServicesImplTest {
 
     final List<ValueInSingleWindow<BigQueryInsertError>> expected =
         ImmutableList.of(
-            wrapValue(
+            wrapErrorValue(
                 new BigQueryInsertError(
                     rows.get(0).getValue(), failures.getInsertErrors().get(0), 
ref)),
-            wrapValue(
+            wrapErrorValue(
                 new BigQueryInsertError(
                     rows.get(1).getValue(), failures.getInsertErrors().get(1), 
ref)));
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index deb1952..b433254 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -196,15 +196,16 @@ public class BigQueryUtilTest {
     TableReference ref = 
BigQueryHelpers.parseTableSpec("project:dataset.table");
     DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, 
options, 5);
 
-    List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     List<String> ids = new ArrayList<>();
     for (int i = 0; i < 25; ++i) {
       rows.add(
-          ValueInSingleWindow.of(
+          FailsafeValueInSingleWindow.of(
               rawRow("foo", 1234),
               GlobalWindow.TIMESTAMP_MAX_VALUE,
               GlobalWindow.INSTANCE,
-              PaneInfo.ON_TIME_AND_ONLY_FIRING));
+              PaneInfo.ON_TIME_AND_ONLY_FIRING,
+              rawRow("foo", 1234)));
       ids.add("");
     }
 

Reply via email to