This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b9dc0f Merge pull request #13055 from [BEAM-11006] BigQuery failsafe
function
8b9dc0f is described below
commit 8b9dc0fef3a541a3502b5dba812da2cb05ab4a5b
Author: Dylan Hercher <[email protected]>
AuthorDate: Fri Oct 16 20:20:52 2020 -0700
Merge pull request #13055 from [BEAM-11006] BigQuery failsafe function
* Adding a Failsafe format function to BigQuery streaming writes
* [BEAM-11006] Adding a Failsafe format function to BigQuery streaming
writes
* make FailsafeValueInSingleWindow public
* missed failsafe
* match BigQueryIO row writer T
* clean extra PrepareWrite
* linting cleanup
* object already defined
* linting
* fixing testing to match new falsafe logic
* cleaninng tests to use failsafe logic
* remove todo
* use proper Failsafe value object
* lintng
* format FailsafeValueInSingleWindow
* fix test to validate on correct expected values
* implement coder logic for failsafe element
* clean up todo
* cleanup todo
* spotless
* spotless
* adding failsafevalueinsinglewindow testing
* adding tests for failsafe
* removing final todo, will add feature in separate PR/issue
* change name to FormatRecordOnFailureFunction
* linting
* linting
Co-authored-by: Dylan Hercher <[email protected]>
---
.../sdk/values/FailsafeValueInSingleWindow.java | 142 +++++++++++++++++++++
.../FailsafeValueInSingleWindowCoderTest.java | 64 ++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 69 +++++-----
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 3 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 9 +-
.../beam/sdk/io/gcp/bigquery/ErrorContainer.java | 11 +-
.../beam/sdk/io/gcp/bigquery/RowWriterFactory.java | 18 ++-
.../beam/sdk/io/gcp/bigquery/StreamingInserts.java | 17 ++-
.../beam/sdk/io/gcp/bigquery/StreamingWriteFn.java | 18 ++-
.../sdk/io/gcp/bigquery/StreamingWriteTables.java | 49 +++++--
.../sdk/io/gcp/testing/FakeDatasetService.java | 10 +-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 3 +-
.../io/gcp/bigquery/BigQueryServicesImplTest.java | 42 ++++--
.../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java | 9 +-
14 files changed, 379 insertions(+), 85 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
new file mode 100644
index 0000000..9a1a8f9
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/FailsafeValueInSingleWindow.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * An immutable tuple of value, timestamp, window, and pane.
+ *
+ * @param <T> the type of the value
+ */
+@AutoValue
+@Internal
+public abstract class FailsafeValueInSingleWindow<T, ErrorT> {
+ /** Returns the value of this {@code FailsafeValueInSingleWindow}. */
+ public abstract @Nullable T getValue();
+
+ /** Returns the timestamp of this {@code FailsafeValueInSingleWindow}. */
+ public abstract Instant getTimestamp();
+
+ /** Returns the window of this {@code FailsafeValueInSingleWindow}. */
+ public abstract BoundedWindow getWindow();
+
+ /** Returns the pane of this {@code FailsafeValueInSingleWindow} in its
window. */
+ public abstract PaneInfo getPane();
+
+ /** Returns the failsafe value of this {@code FailsafeValueInSingleWindow}.
*/
+ public abstract @Nullable ErrorT getFailsafeValue();
+
+ public static <T, ErrorT> FailsafeValueInSingleWindow<T, ErrorT> of(
+ T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo,
ErrorT failsafeValue) {
+ return new AutoValue_FailsafeValueInSingleWindow<>(
+ value, timestamp, window, paneInfo, failsafeValue);
+ }
+
+ /** A coder for {@link FailsafeValueInSingleWindow}. */
+ public static class Coder<T, ErrorT>
+ extends StructuredCoder<FailsafeValueInSingleWindow<T, ErrorT>> {
+ private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
+ private final org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder;
+ private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
+
+ public static <T, ErrorT> Coder<T, ErrorT> of(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder)
{
+ return new Coder<>(valueCoder, failsafeValueCoder, windowCoder);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<ErrorT> failsafeValueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder)
{
+ this.valueCoder = valueCoder;
+ this.failsafeValueCoder = failsafeValueCoder;
+ this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
+ }
+
+ @Override
+ public void encode(FailsafeValueInSingleWindow<T, ErrorT> windowedElem,
OutputStream outStream)
+ throws IOException {
+ encode(windowedElem, outStream, Context.NESTED);
+ }
+
+ @Override
+ public void encode(
+ FailsafeValueInSingleWindow<T, ErrorT> windowedElem,
+ OutputStream outStream,
+ Context context)
+ throws IOException {
+ InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+ windowCoder.encode(windowedElem.getWindow(), outStream);
+ PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(),
outStream);
+ valueCoder.encode(windowedElem.getValue(), outStream);
+ failsafeValueCoder.encode(windowedElem.getFailsafeValue(), outStream);
+ }
+
+ @Override
+ public FailsafeValueInSingleWindow<T, ErrorT> decode(InputStream inStream)
throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
+ public FailsafeValueInSingleWindow<T, ErrorT> decode(InputStream inStream,
Context context)
+ throws IOException {
+ Instant timestamp = InstantCoder.of().decode(inStream);
+ BoundedWindow window = windowCoder.decode(inStream);
+ PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
+ T value = valueCoder.decode(inStream);
+ ErrorT failsafeValue = failsafeValueCoder.decode(inStream);
+ return new AutoValue_FailsafeValueInSingleWindow<>(
+ value, timestamp, window, paneInfo, failsafeValue);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>>
getCoderArguments() {
+ // Coder arguments are coders for the type parameters of the coder -
i.e. T and ErrorT
+ return ImmutableList.of(valueCoder, failsafeValueCoder);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents()
{
+ // Coder components are all inner coders that it uses - i.e. both T,
ErrorT and BoundedWindow.
+ return ImmutableList.of(valueCoder, failsafeValueCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ valueCoder.verifyDeterministic();
+ failsafeValueCoder.verifyDeterministic();
+ windowCoder.verifyDeterministic();
+ }
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
new file mode 100644
index 0000000..4c8237b
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FailsafeValueInSingleWindowCoderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FailsafeValueInSingleWindow.Coder}. */
+@RunWith(JUnit4.class)
+public class FailsafeValueInSingleWindowCoderTest {
+ @Test
+ public void testDecodeEncodeEqual() throws Exception {
+ Instant now = Instant.now();
+ FailsafeValueInSingleWindow<String, String> value =
+ FailsafeValueInSingleWindow.of(
+ "foo",
+ now,
+ new IntervalWindow(now, now.plus(Duration.standardSeconds(10))),
+ PaneInfo.NO_FIRING,
+ "bar");
+
+ CoderProperties.coderDecodeEncodeEqual(
+ FailsafeValueInSingleWindow.Coder.of(
+ StringUtf8Coder.of(), StringUtf8Coder.of(),
IntervalWindow.getCoder()),
+ value);
+ }
+
+ @Test
+ public void testCoderSerializable() throws Exception {
+ CoderProperties.coderSerializable(
+ FailsafeValueInSingleWindow.Coder.of(
+ StringUtf8Coder.of(), StringUtf8Coder.of(),
IntervalWindow.getCoder()));
+ }
+
+ @Test
+ public void testCoderIsSerializableWithWellKnownCoderType() {
+ CoderProperties.coderSerializable(
+ FailsafeValueInSingleWindow.Coder.of(
+ GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE,
GlobalWindow.Coder.INSTANCE));
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2f22af3..36b03ec 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1723,6 +1723,8 @@ public class BigQueryIO {
abstract @Nullable SerializableFunction<T, TableRow> getFormatFunction();
+ abstract @Nullable SerializableFunction<T, TableRow>
getFormatRecordOnFailureFunction();
+
abstract RowWriterFactory.@Nullable AvroRowWriterFactory<T, ?, ?>
getAvroRowWriterFactory();
abstract @Nullable SerializableFunction<TableSchema,
org.apache.avro.Schema>
@@ -1798,6 +1800,9 @@ public class BigQueryIO {
abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow>
formatFunction);
+ abstract Builder<T> setFormatRecordOnFailureFunction(
+ SerializableFunction<T, TableRow> formatFunction);
+
abstract Builder<T> setAvroRowWriterFactory(
RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
@@ -2003,6 +2008,16 @@ public class BigQueryIO {
}
/**
+ * If an insert failure occurs, this function is applied to the originally
supplied row T. The
+ * resulting {@link TableRow} will be accessed via {@link
+ * WriteResult#getFailedInsertsWithErr()}.
+ */
+ public Write<T> withFormatRecordOnFailureFunction(
+ SerializableFunction<T, TableRow> formatFunction) {
+ return
toBuilder().setFormatRecordOnFailureFunction(formatFunction).build();
+ }
+
+ /**
* Formats the user's type into a {@link GenericRecord} to be written to
BigQuery. The
* GenericRecords are written as avro using the standard {@link
GenericDatumWriter}.
*
@@ -2521,6 +2536,8 @@ public class BigQueryIO {
PCollection<T> input, DynamicDestinations<T, DestinationT>
dynamicDestinations) {
boolean optimizeWrites = getOptimizeWrites();
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
+ SerializableFunction<T, TableRow> formatRecordOnFailureFunction =
+ getFormatRecordOnFailureFunction();
RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>
avroRowWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>)
getAvroRowWriterFactory();
@@ -2566,8 +2583,8 @@ public class BigQueryIO {
}
Method method = resolveMethod(input);
+ RowWriterFactory<T, DestinationT> rowWriterFactory;
if (optimizeWrites) {
- RowWriterFactory<T, DestinationT> rowWriterFactory;
if (avroRowWriterFactory != null) {
checkArgument(
formatFunction == null,
@@ -2584,7 +2601,8 @@ public class BigQueryIO {
}
rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations,
avroSchemaFactory);
} else if (formatFunction != null) {
- rowWriterFactory = RowWriterFactory.tableRows(formatFunction);
+ rowWriterFactory =
+ RowWriterFactory.tableRows(formatFunction,
formatRecordOnFailureFunction);
} else {
throw new IllegalArgumentException(
"A function must be provided to convert the input type into a
TableRow or "
@@ -2592,20 +2610,6 @@ public class BigQueryIO {
+ "BigQueryIO.Write.withAvroFormatFunction to provide a
formatting function. "
+ "A format function is not required if Beam schemas are
used.");
}
-
- PCollection<KV<DestinationT, T>> rowsWithDestination =
- input
- .apply(
- "PrepareWrite",
- new PrepareWrite<>(dynamicDestinations,
SerializableFunctions.identity()))
- .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
- return continueExpandTyped(
- rowsWithDestination,
- input.getCoder(),
- destinationCoder,
- dynamicDestinations,
- rowWriterFactory,
- method);
} else {
checkArgument(avroRowWriterFactory == null);
checkArgument(
@@ -2615,22 +2619,22 @@ public class BigQueryIO {
+ "BigQueryIO.Write.withAvroFormatFunction to provide a
formatting function. "
+ "A format function is not required if Beam schemas are
used.");
- PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
- input
- .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations,
formatFunction))
- .setCoder(KvCoder.of(destinationCoder,
TableRowJsonCoder.of()));
-
- RowWriterFactory<TableRow, DestinationT> rowWriterFactory =
- RowWriterFactory.tableRows(SerializableFunctions.identity());
-
- return continueExpandTyped(
- rowsWithDestination,
- TableRowJsonCoder.of(),
- destinationCoder,
- dynamicDestinations,
- rowWriterFactory,
- method);
+ rowWriterFactory =
+ RowWriterFactory.tableRows(formatFunction,
formatRecordOnFailureFunction);
}
+ PCollection<KV<DestinationT, T>> rowsWithDestination =
+ input
+ .apply(
+ "PrepareWrite",
+ new PrepareWrite<>(dynamicDestinations,
SerializableFunctions.identity()))
+ .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
+ return continueExpandTyped(
+ rowsWithDestination,
+ input.getCoder(),
+ destinationCoder,
+ dynamicDestinations,
+ rowWriterFactory,
+ method);
}
private <DestinationT, ElementT> WriteResult continueExpandTyped(
@@ -2659,7 +2663,8 @@ public class BigQueryIO {
getCreateDisposition(),
dynamicDestinations,
elementCoder,
- tableRowWriterFactory.getToRowFn())
+ tableRowWriterFactory.getToRowFn(),
+ tableRowWriterFactory.getToFailsafeRowFn())
.withInsertRetryPolicy(retryPolicy)
.withTestServices(getBigQueryServices())
.withExtendedErrorInfo(getExtendedErrorInfo())
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 12f0029..f6894ca 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.util.Histogram;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -154,7 +155,7 @@ public interface BigQueryServices extends Serializable {
*/
<T> long insertAll(
TableReference ref,
- List<ValueInSingleWindow<TableRow>> rowList,
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5935a76..bff771f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Histogram;
import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -752,7 +753,7 @@ class BigQueryServicesImpl implements BigQueryServices {
@VisibleForTesting
<T> long insertAll(
TableReference ref,
- List<ValueInSingleWindow<TableRow>> rowList,
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
BackOff backoff,
FluentBackoff rateLimitBackoffFactory,
@@ -781,13 +782,13 @@ class BigQueryServicesImpl implements BigQueryServices {
List<TableDataInsertAllResponse.InsertErrors> allErrors = new
ArrayList<>();
// These lists contain the rows to publish. Initially the contain the
entire list.
// If there are failures, they will contain only the failed rows to be
retried.
- List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish =
rowList;
List<String> idsToPublish = null;
if (!ignoreInsertIds) {
idsToPublish = insertIdList;
}
while (true) {
- List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new
ArrayList<>();
List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() :
null;
int strideIndex = 0;
@@ -944,7 +945,7 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public <T> long insertAll(
TableReference ref,
- List<ValueInSingleWindow<TableRow>> rowList,
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
index 8b05bd0..7674d68 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
@@ -34,14 +35,18 @@ public interface ErrorContainer<T> extends Serializable {
List<ValueInSingleWindow<T>> failedInserts,
TableDataInsertAllResponse.InsertErrors error,
TableReference ref,
- ValueInSingleWindow<TableRow> tableRow);
+ FailsafeValueInSingleWindow<TableRow, TableRow> tableRow);
ErrorContainer<TableRow> TABLE_ROW_ERROR_CONTAINER =
- (failedInserts, error, ref, tableRow) -> failedInserts.add(tableRow);
+ (failedInserts, error, ref, tableRow) ->
+ failedInserts.add(
+ ValueInSingleWindow.of(
+ tableRow.getFailsafeValue(), tableRow.getTimestamp(),
+ tableRow.getWindow(), tableRow.getPane()));
ErrorContainer<BigQueryInsertError> BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER =
(failedInserts, error, ref, tableRow) -> {
- BigQueryInsertError err = new BigQueryInsertError(tableRow.getValue(),
error, ref);
+ BigQueryInsertError err = new
BigQueryInsertError(tableRow.getFailsafeValue(), error, ref);
failedInserts.add(
ValueInSingleWindow.of(
err, tableRow.getTimestamp(), tableRow.getWindow(),
tableRow.getPane()));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
index 7229957..a205b3c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
@@ -40,23 +40,35 @@ abstract class RowWriterFactory<ElementT, DestinationT>
implements Serializable
String tempFilePrefix, DestinationT destination) throws Exception;
static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT>
tableRows(
- SerializableFunction<ElementT, TableRow> toRow) {
- return new TableRowWriterFactory<ElementT, DestinationT>(toRow);
+ SerializableFunction<ElementT, TableRow> toRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeRow) {
+ return new TableRowWriterFactory<ElementT, DestinationT>(toRow,
toFailsafeRow);
}
static final class TableRowWriterFactory<ElementT, DestinationT>
extends RowWriterFactory<ElementT, DestinationT> {
private final SerializableFunction<ElementT, TableRow> toRow;
+ private final SerializableFunction<ElementT, TableRow> toFailsafeRow;
- private TableRowWriterFactory(SerializableFunction<ElementT, TableRow>
toRow) {
+ private TableRowWriterFactory(
+ SerializableFunction<ElementT, TableRow> toRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeRow) {
this.toRow = toRow;
+ this.toFailsafeRow = toFailsafeRow;
}
public SerializableFunction<ElementT, TableRow> getToRowFn() {
return toRow;
}
+ public SerializableFunction<ElementT, TableRow> getToFailsafeRowFn() {
+ if (toFailsafeRow == null) {
+ return toRow;
+ }
+ return toFailsafeRow;
+ }
+
@Override
public OutputType getOutputType() {
return OutputType.JsonTableRow;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index d00adbb..22415a6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -42,13 +42,15 @@ public class StreamingInserts<DestinationT, ElementT>
private final String kmsKey;
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toTableRow;
+ private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
/** Constructor. */
public StreamingInserts(
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations,
Coder<ElementT> elementCoder,
- SerializableFunction<ElementT, TableRow> toTableRow) {
+ SerializableFunction<ElementT, TableRow> toTableRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
this(
createDisposition,
dynamicDestinations,
@@ -60,6 +62,7 @@ public class StreamingInserts<DestinationT, ElementT>
false,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
null);
}
@@ -75,6 +78,7 @@ public class StreamingInserts<DestinationT, ElementT>
boolean ignoreInsertIds,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
String kmsKey) {
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
@@ -86,6 +90,7 @@ public class StreamingInserts<DestinationT, ElementT>
this.ignoreInsertIds = ignoreInsertIds;
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
+ this.toFailsafeTableRow = toFailsafeTableRow;
this.kmsKey = kmsKey;
}
@@ -103,6 +108,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -119,6 +125,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -134,6 +141,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -149,6 +157,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -164,6 +173,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -179,6 +189,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -194,6 +205,7 @@ public class StreamingInserts<DestinationT, ElementT>
ignoreInsertIds,
elementCoder,
toTableRow,
+ toFailsafeTableRow,
kmsKey);
}
@@ -215,6 +227,7 @@ public class StreamingInserts<DestinationT, ElementT>
.withIgnoreUnknownValues(ignoreUnknownValues)
.withIgnoreInsertIds(ignoreInsertIds)
.withElementCoder(elementCoder)
- .withToTableRow(toTableRow));
+ .withToTableRow(toTableRow)
+ .withToFailsafeTableRow(toFailsafeTableRow));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index a956f0d..95e6467 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Histogram;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
@@ -58,9 +59,10 @@ class StreamingWriteFn<ErrorT, ElementT>
private final boolean ignoreUnknownValues;
private final boolean ignoreInsertIds;
private final SerializableFunction<ElementT, TableRow> toTableRow;
+ private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
- private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
+ private transient Map<String, List<FailsafeValueInSingleWindow<TableRow,
TableRow>>> tableRows;
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;
@@ -79,7 +81,8 @@ class StreamingWriteFn<ErrorT, ElementT>
boolean skipInvalidRows,
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
- SerializableFunction<ElementT, TableRow> toTableRow) {
+ SerializableFunction<ElementT, TableRow> toTableRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
this.bqServices = bqServices;
this.retryPolicy = retryPolicy;
this.failedOutputTag = failedOutputTag;
@@ -88,6 +91,7 @@ class StreamingWriteFn<ErrorT, ElementT>
this.ignoreUnknownValues = ignoreUnknownValues;
this.ignoreInsertIds = ignoreInsertIds;
this.toTableRow = toTableRow;
+ this.toFailsafeTableRow = toFailsafeTableRow;
}
@Setup
@@ -120,13 +124,14 @@ class StreamingWriteFn<ErrorT, ElementT>
BoundedWindow window,
PaneInfo pane) {
String tableSpec = element.getKey().getKey();
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
List<String> uniqueIds =
BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
tableSpec);
TableRow tableRow = toTableRow.apply(element.getValue().tableRow);
- rows.add(ValueInSingleWindow.of(tableRow, timestamp, window, pane));
+ TableRow failsafeTableRow =
toFailsafeTableRow.apply(element.getValue().tableRow);
+ rows.add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane,
failsafeTableRow));
uniqueIds.add(element.getValue().uniqueId);
}
@@ -135,7 +140,8 @@ class StreamingWriteFn<ErrorT, ElementT>
public void finishBundle(FinishBundleContext context) throws Exception {
List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
BigQueryOptions options =
context.getPipelineOptions().as(BigQueryOptions.class);
- for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry :
tableRows.entrySet()) {
+ for (Map.Entry<String, List<FailsafeValueInSingleWindow<TableRow,
TableRow>>> entry :
+ tableRows.entrySet()) {
TableReference tableReference =
BigQueryHelpers.parseTableSpec(entry.getKey());
flushRows(
tableReference,
@@ -173,7 +179,7 @@ class StreamingWriteFn<ErrorT, ElementT>
/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(
TableReference tableReference,
- List<ValueInSingleWindow<TableRow>> tableRows,
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows,
List<String> uniqueIds,
BigQueryOptions options,
List<ValueInSingleWindow<ErrorT>> failedInserts)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 81f097a..5940415 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -56,6 +56,7 @@ public class StreamingWriteTables<ElementT>
private final boolean ignoreInsertIds;
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toTableRow;
+ private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
public StreamingWriteTables() {
this(
@@ -66,7 +67,8 @@ public class StreamingWriteTables<ElementT>
false, // ignoreUnknownValues
false, // ignoreInsertIds
null, // elementCoder
- null); // toTableRow
+ null, // toTableRow
+ null); // toFailsafeTableRow
}
private StreamingWriteTables(
@@ -77,7 +79,8 @@ public class StreamingWriteTables<ElementT>
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
Coder<ElementT> elementCoder,
- SerializableFunction<ElementT, TableRow> toTableRow) {
+ SerializableFunction<ElementT, TableRow> toTableRow,
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
this.extendedErrorInfo = extendedErrorInfo;
@@ -86,6 +89,7 @@ public class StreamingWriteTables<ElementT>
this.ignoreInsertIds = ignoreInsertIds;
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
+ this.toFailsafeTableRow = toFailsafeTableRow;
}
StreamingWriteTables<ElementT> withTestServices(BigQueryServices
bigQueryServices) {
@@ -97,7 +101,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy
retryPolicy) {
@@ -109,7 +114,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean
extendedErrorInfo) {
@@ -121,7 +127,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withSkipInvalidRows(boolean skipInvalidRows) {
@@ -133,7 +140,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean
ignoreUnknownValues) {
@@ -145,7 +153,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean ignoreInsertIds) {
@@ -157,7 +166,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT>
elementCoder) {
@@ -169,7 +179,8 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
}
StreamingWriteTables<ElementT> withToTableRow(
@@ -182,7 +193,22 @@ public class StreamingWriteTables<ElementT>
ignoreUnknownValues,
ignoreInsertIds,
elementCoder,
- toTableRow);
+ toTableRow,
+ toFailsafeTableRow);
+ }
+
+ StreamingWriteTables<ElementT> withToFailsafeTableRow(
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
+ return new StreamingWriteTables<>(
+ bigQueryServices,
+ retryPolicy,
+ extendedErrorInfo,
+ skipInvalidRows,
+ ignoreUnknownValues,
+ ignoreInsertIds,
+ elementCoder,
+ toTableRow,
+ toFailsafeTableRow);
}
@Override
@@ -264,7 +290,8 @@ public class StreamingWriteTables<ElementT>
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
- toTableRow))
+ toTableRow,
+ toFailsafeTableRow))
.withOutputTags(mainOutputTag,
TupleTagList.of(failedInsertsTag)));
PCollection<T> failedInserts = tuple.get(failedInsertsTag);
failedInserts.setCoder(coder);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 963edb4..149e068 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -213,14 +214,15 @@ public class FakeDatasetService implements
DatasetService, Serializable {
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String>
insertIdList)
throws IOException, InterruptedException {
- List<ValueInSingleWindow<TableRow>> windowedRows = Lists.newArrayList();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> windowedRows =
Lists.newArrayList();
for (TableRow row : rowList) {
windowedRows.add(
- ValueInSingleWindow.of(
+ FailsafeValueInSingleWindow.of(
row,
GlobalWindow.TIMESTAMP_MAX_VALUE,
GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ row));
}
return insertAll(
ref,
@@ -237,7 +239,7 @@ public class FakeDatasetService implements DatasetService,
Serializable {
@Override
public <T> long insertAll(
TableReference ref,
- List<ValueInSingleWindow<TableRow>> rowList,
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 4489139..d22b04e 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1556,7 +1556,8 @@ public class BigQueryIOWriteTest implements Serializable {
BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
- RowWriterFactory.tableRows(SerializableFunctions.identity()));
+ RowWriterFactory.tableRows(
+ SerializableFunctions.identity(),
SerializableFunctions.identity()));
DoFnTester<
Iterable<WriteBundlesToFiles.Result<TableDestination>>,
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 15bfc0a..a27cdbb 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -483,7 +484,16 @@ public class BigQueryServicesImplTest {
verify(response, times(1)).getContentType();
}
- private <T> ValueInSingleWindow<T> wrapValue(T value) {
+ private <T> FailsafeValueInSingleWindow<T, T> wrapValue(T value) {
+ return FailsafeValueInSingleWindow.of(
+ value,
+ GlobalWindow.TIMESTAMP_MAX_VALUE,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ value);
+ }
+
+ private <T> ValueInSingleWindow<T> wrapErrorValue(T value) {
return ValueInSingleWindow.of(
value,
GlobalWindow.TIMESTAMP_MAX_VALUE,
@@ -496,7 +506,7 @@ public class BigQueryServicesImplTest {
public void testInsertRateLimitRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new
ArrayList<>();
rows.add(wrapValue(new TableRow()));
// First response is 403 rate limited, second response has valid payload.
@@ -532,7 +542,7 @@ public class BigQueryServicesImplTest {
public void testInsertQuotaExceededRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new
ArrayList<>();
rows.add(wrapValue(new TableRow()));
// First response is 403 quota exceeded, second response has valid payload.
@@ -568,7 +578,7 @@ public class BigQueryServicesImplTest {
public void testInsertStoppedRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new
ArrayList<>();
rows.add(wrapValue(new TableRow()));
// Respond 403 four times, then valid payload.
@@ -620,7 +630,7 @@ public class BigQueryServicesImplTest {
public void testInsertRetrySelectRows() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(
wrapValue(new TableRow().set("row", "a")), wrapValue(new
TableRow().set("row", "b")));
List<String> insertIds = ImmutableList.of("a", "b");
@@ -664,7 +674,7 @@ public class BigQueryServicesImplTest {
public void testInsertFailsGracefully() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
final TableDataInsertAllResponse row1Failed =
@@ -723,7 +733,7 @@ public class BigQueryServicesImplTest {
public void testFailInsertOtherRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new
ArrayList<>();
rows.add(wrapValue(new TableRow()));
// First response is 403 non-{rate-limited, quota-exceeded}, second
response has valid payload
@@ -767,7 +777,7 @@ public class BigQueryServicesImplTest {
public void testInsertRetryPolicy() throws InterruptedException, IOException
{
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
// First time row0 fails with a retryable error, and row1 fails with a
persistent error.
@@ -836,7 +846,7 @@ public class BigQueryServicesImplTest {
throws InterruptedException, IOException {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
final TableDataInsertAllResponse allRowsSucceeded = new
TableDataInsertAllResponse();
@@ -1051,10 +1061,14 @@ public class BigQueryServicesImplTest {
public void testSimpleErrorRetrieval() throws InterruptedException,
IOException {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(
wrapValue(new TableRow().set("a", 1)), wrapValue(new
TableRow().set("b", 2)));
+ final List<ValueInSingleWindow<TableRow>> expected =
+ ImmutableList.of(
+ wrapErrorValue(new TableRow().set("a", 1)), wrapErrorValue(new
TableRow().set("b", 2)));
+
final TableDataInsertAllResponse failures =
new TableDataInsertAllResponse()
.setInsertErrors(
@@ -1090,7 +1104,7 @@ public class BigQueryServicesImplTest {
false,
false);
- assertThat(failedInserts, is(rows));
+ assertThat(failedInserts, is(expected));
}
/** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link
ErrorContainer}. */
@@ -1098,7 +1112,7 @@ public class BigQueryServicesImplTest {
public void testExtendedErrorRetrieval() throws InterruptedException,
IOException {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<ValueInSingleWindow<TableRow>> rows =
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(
wrapValue(new TableRow().set("a", 1)), wrapValue(new
TableRow().set("b", 2)));
@@ -1115,10 +1129,10 @@ public class BigQueryServicesImplTest {
final List<ValueInSingleWindow<BigQueryInsertError>> expected =
ImmutableList.of(
- wrapValue(
+ wrapErrorValue(
new BigQueryInsertError(
rows.get(0).getValue(), failures.getInsertErrors().get(0),
ref)),
- wrapValue(
+ wrapErrorValue(
new BigQueryInsertError(
rows.get(1).getValue(), failures.getInsertErrors().get(1),
ref)));
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index deb1952..b433254 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -196,15 +196,16 @@ public class BigQueryUtilTest {
TableReference ref =
BigQueryHelpers.parseTableSpec("project:dataset.table");
DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient,
options, 5);
- List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new
ArrayList<>();
List<String> ids = new ArrayList<>();
for (int i = 0; i < 25; ++i) {
rows.add(
- ValueInSingleWindow.of(
+ FailsafeValueInSingleWindow.of(
rawRow("foo", 1234),
GlobalWindow.TIMESTAMP_MAX_VALUE,
GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ rawRow("foo", 1234)));
ids.add("");
}