[
https://issues.apache.org/jira/browse/BEAM-4257?focusedWorklogId=125793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125793
]
ASF GitHub Bot logged work on BEAM-4257:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jul/18 16:02
Start Date: 22/Jul/18 16:02
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #5341: [BEAM-4257]
Increases BigQuery streaming error information
URL: https://github.com/apache/beam/pull/5341
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 26c96318375..e639f0c2acf 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1052,6 +1052,7 @@ static String getExtractDestinationUri(String
extractDestinationDir) {
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
.setNumFileShards(0)
.setMethod(Write.Method.DEFAULT)
+ .setExtendedErrorInfo(false)
.build();
}
@@ -1158,6 +1159,8 @@ static String getExtractDestinationUri(String
extractDestinationDir) {
@Nullable
abstract ValueProvider<String> getCustomGcsTempLocation();
+ abstract boolean getExtendedErrorInfo();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -1203,6 +1206,8 @@ static String getExtractDestinationUri(String
extractDestinationDir) {
abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String>
customGcsTempLocation);
+ abstract Builder<T> setExtendedErrorInfo(boolean extendedErrorInfo);
+
abstract Write<T> build();
}
@@ -1482,6 +1487,16 @@ static String getExtractDestinationUri(String
extractDestinationDir) {
return
toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build();
}
+ /**
+ * Enables extended error information by enabling {@link
WriteResult#getFailedInsertsWithErr()}
+ *
+ * <p>ATM this only works if using {@link Method#STREAMING_INSERTS}. See
{@link
+ * Write#withMethod(Method)}.
+ */
+ public Write<T> withExtendedErrorInfo() {
+ return toBuilder().setExtendedErrorInfo(true).build();
+ }
+
@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
@@ -1666,7 +1681,8 @@ public WriteResult expand(PCollection<T> input) {
StreamingInserts<DestinationT> streamingInserts =
new StreamingInserts<>(getCreateDisposition(), dynamicDestinations)
.withInsertRetryPolicy(retryPolicy)
- .withTestServices((getBigQueryServices()));
+ .withTestServices((getBigQueryServices()))
+ .withExtendedErrorInfo(getExtendedErrorInfo());
return rowsWithDestination.apply(streamingInserts);
} else {
checkArgument(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertError.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertError.java
new file mode 100644
index 00000000000..cdd814587e8
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertError.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Objects;
+
+/**
+ * Model definition for BigQueryInsertError.
+ *
+ * <p>This class represents an error inserting a {@link TableRow} into
BigQuery.
+ */
+public class BigQueryInsertError {
+
+ /** The {@link TableRow} that could not be inserted. */
+ private TableRow row;
+
+ /**
+ * The {@link
com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors}
+ * caused.
+ */
+ private TableDataInsertAllResponse.InsertErrors error;
+
+ /**
+ * The {@link TableReference} where the {@link BigQueryInsertError#row} was
tried to be inserted.
+ */
+ private TableReference table;
+
+ public BigQueryInsertError(
+ TableRow row, TableDataInsertAllResponse.InsertErrors error,
TableReference table) {
+ this.row = row;
+ this.error = error;
+ this.table = table;
+ }
+
+ public TableRow getRow() {
+ return row;
+ }
+
+ public TableDataInsertAllResponse.InsertErrors getError() {
+ return error;
+ }
+
+ public TableReference getTable() {
+ return table;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BigQueryInsertError that = (BigQueryInsertError) o;
+ return Objects.equals(row, that.getRow())
+ && Objects.equals(error, that.getError())
+ && Objects.equals(table, that.getTable());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(row, error, table);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoder.java
new file mode 100644
index 00000000000..f557a6c6baf
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link Coder} that encodes BigQuery {@link BigQueryInsertError} objects.
*/
+public class BigQueryInsertErrorCoder extends AtomicCoder<BigQueryInsertError>
{
+
+ public static BigQueryInsertErrorCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BigQueryInsertError value, OutputStream outStream) throws
IOException {
+ String errorStrValue = MAPPER.writeValueAsString(value.getError());
+ StringUtf8Coder.of().encode(errorStrValue, outStream);
+
+ TableRowJsonCoder.of().encode(value.getRow(), outStream);
+
+ StringUtf8Coder.of().encode(BigQueryHelpers.toTableSpec(value.getTable()),
outStream);
+ }
+
+ @Override
+ public BigQueryInsertError decode(InputStream inStream) throws IOException {
+ TableDataInsertAllResponse.InsertErrors err =
+ MAPPER.readValue(
+ StringUtf8Coder.of().decode(inStream),
TableDataInsertAllResponse.InsertErrors.class);
+ TableRow row = TableRowJsonCoder.of().decode(inStream);
+ TableReference ref =
BigQueryHelpers.parseTableSpec(StringUtf8Coder.of().decode(inStream));
+ return new BigQueryInsertError(row, err, ref);
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(BigQueryInsertError value) throws
Exception {
+ String errorStrValue = MAPPER.writeValueAsString(value.getError());
+ String tableStrValue = MAPPER.writeValueAsString(value.getTable());
+ return StringUtf8Coder.of().getEncodedElementByteSize(errorStrValue)
+ + TableRowJsonCoder.of().getEncodedElementByteSize(value.getRow())
+ + StringUtf8Coder.of().getEncodedElementByteSize(tableStrValue);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
BigQueryInsertError.
+ // NON_FINAL default typing is enabled so that the java.lang.Long value on
the errors index can be
+ // properly deserialized as such and not as an Integer instead.
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+ .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+
+ private static final BigQueryInsertErrorCoder INSTANCE = new
BigQueryInsertErrorCoder();
+ private static final TypeDescriptor<BigQueryInsertError> TYPE_DESCRIPTOR =
+ new TypeDescriptor<BigQueryInsertError>() {};
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws NonDeterministicException always. A {@link TableRow} can hold
arbitrary {@link Object}
+ * instances, which makes the encoding non-deterministic.
+ */
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(
+ this, "TableRow can hold arbitrary instances, which may be
non-deterministic.");
+ }
+
+ @Override
+ public TypeDescriptor<BigQueryInsertError> getEncodedTypeDescriptor() {
+ return TYPE_DESCRIPTOR;
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 96edd3b3222..395f66bab00 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -32,12 +32,10 @@
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/** An interface for real, mock, or fake implementations of Cloud BigQuery
services. */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public interface BigQueryServices extends Serializable {
+interface BigQueryServices extends Serializable {
/** Returns a real, mock, or fake {@link JobService}. */
JobService getJobService(BigQueryOptions bqOptions);
@@ -137,12 +135,13 @@ void createDataset(
*
* <p>Returns the total bytes count of {@link TableRow TableRows}.
*/
- long insertAll(
+ <T> long insertAll(
TableReference ref,
List<ValueInSingleWindow<TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
- List<ValueInSingleWindow<TableRow>> failedInserts)
+ List<ValueInSingleWindow<T>> failedInserts,
+ ErrorContainer<T> errorContainer)
throws IOException, InterruptedException;
/** Patch BigQuery {@link Table} description. */
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6650235b8f8..05fe01bc487 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -655,14 +655,15 @@ public void deleteDataset(String projectId, String
datasetId)
}
@VisibleForTesting
- long insertAll(
+ <T> long insertAll(
TableReference ref,
List<ValueInSingleWindow<TableRow>> rowList,
@Nullable List<String> insertIdList,
BackOff backoff,
final Sleeper sleeper,
InsertRetryPolicy retryPolicy,
- List<ValueInSingleWindow<TableRow>> failedInserts)
+ List<ValueInSingleWindow<T>> failedInserts,
+ ErrorContainer<T> errorContainer)
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
@@ -766,7 +767,7 @@ long insertAll(
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
- failedInserts.add(rowsToPublish.get(errorIndex));
+ errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(errorIndex));
}
}
}
@@ -803,12 +804,13 @@ long insertAll(
}
@Override
- public long insertAll(
+ public <T> long insertAll(
TableReference ref,
List<ValueInSingleWindow<TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
- List<ValueInSingleWindow<TableRow>> failedInserts)
+ List<ValueInSingleWindow<T>> failedInserts,
+ ErrorContainer<T> errorContainer)
throws IOException, InterruptedException {
return insertAll(
ref,
@@ -817,7 +819,8 @@ public long insertAll(
BackOffAdapter.toGcpBackOff(INSERT_BACKOFF_FACTORY.backoff()),
Sleeper.DEFAULT,
retryPolicy,
- failedInserts);
+ failedInserts,
+ errorContainer);
}
@Override
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
new file mode 100644
index 00000000000..8b05bd0fd5c
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ErrorContainer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * ErrorContainer interface.
+ *
+ * @param <T>
+ */
+public interface ErrorContainer<T> extends Serializable {
+ void add(
+ List<ValueInSingleWindow<T>> failedInserts,
+ TableDataInsertAllResponse.InsertErrors error,
+ TableReference ref,
+ ValueInSingleWindow<TableRow> tableRow);
+
+ ErrorContainer<TableRow> TABLE_ROW_ERROR_CONTAINER =
+ (failedInserts, error, ref, tableRow) -> failedInserts.add(tableRow);
+
+ ErrorContainer<BigQueryInsertError> BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER =
+ (failedInserts, error, ref, tableRow) -> {
+ BigQueryInsertError err = new BigQueryInsertError(tableRow.getValue(),
error, ref);
+ failedInserts.add(
+ ValueInSingleWindow.of(
+ err, tableRow.getTimestamp(), tableRow.getWindow(),
tableRow.getPane()));
+ };
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 1c80ac4f339..b3658c524a8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -34,6 +34,7 @@
private final CreateDisposition createDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private InsertRetryPolicy retryPolicy;
+ private boolean extendedErrorInfo;
/** Constructor. */
public StreamingInserts(
@@ -43,7 +44,8 @@ public StreamingInserts(
createDisposition,
dynamicDestinations,
new BigQueryServicesImpl(),
- InsertRetryPolicy.alwaysRetry());
+ InsertRetryPolicy.alwaysRetry(),
+ false);
}
/** Constructor. */
@@ -51,22 +53,30 @@ private StreamingInserts(
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations,
BigQueryServices bigQueryServices,
- InsertRetryPolicy retryPolicy) {
+ InsertRetryPolicy retryPolicy,
+ boolean extendedErrorInfo) {
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
+ this.extendedErrorInfo = extendedErrorInfo;
}
/** Specify a retry policy for failed inserts. */
public StreamingInserts<DestinationT>
withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
return new StreamingInserts<>(
- createDisposition, dynamicDestinations, bigQueryServices, retryPolicy);
+ createDisposition, dynamicDestinations, bigQueryServices, retryPolicy,
extendedErrorInfo);
+ }
+
+ /** Specify whether to use extended error info or not. */
+ public StreamingInserts<DestinationT> withExtendedErrorInfo(boolean
extendedErrorInfo) {
+ return new StreamingInserts<>(
+ createDisposition, dynamicDestinations, bigQueryServices, retryPolicy,
extendedErrorInfo);
}
StreamingInserts<DestinationT> withTestServices(BigQueryServices
bigQueryServices) {
return new StreamingInserts<>(
- createDisposition, dynamicDestinations, bigQueryServices, retryPolicy);
+ createDisposition, dynamicDestinations, bigQueryServices, retryPolicy,
extendedErrorInfo);
}
@Override
@@ -80,6 +90,7 @@ public WriteResult expand(PCollection<KV<DestinationT,
TableRow>> input) {
return writes.apply(
new StreamingWriteTables()
.withTestServices(bigQueryServices)
- .withInsertRetryPolicy(retryPolicy));
+ .withInsertRetryPolicy(retryPolicy)
+ .withExtendedErrorInfo(extendedErrorInfo));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index de097fbc0ed..2af900e86b1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -39,10 +39,11 @@
/** Implementation of DoFn to perform streaming BigQuery write. */
@SystemDoFnInternal
@VisibleForTesting
-class StreamingWriteFn extends DoFn<KV<ShardedKey<String>, TableRowInfo>,
Void> {
+class StreamingWriteFn<ErrorT> extends DoFn<KV<ShardedKey<String>,
TableRowInfo>, Void> {
private final BigQueryServices bqServices;
private final InsertRetryPolicy retryPolicy;
- private final TupleTag<TableRow> failedOutputTag;
+ private final TupleTag<ErrorT> failedOutputTag;
+ private final ErrorContainer<ErrorT> errorContainer;
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
@@ -56,10 +57,12 @@
StreamingWriteFn(
BigQueryServices bqServices,
InsertRetryPolicy retryPolicy,
- TupleTag<TableRow> failedOutputTag) {
+ TupleTag<ErrorT> failedOutputTag,
+ ErrorContainer<ErrorT> errorContainer) {
this.bqServices = bqServices;
this.retryPolicy = retryPolicy;
this.failedOutputTag = failedOutputTag;
+ this.errorContainer = errorContainer;
}
/** Prepares a target BigQuery table. */
@@ -87,7 +90,7 @@ public void processElement(ProcessContext context,
BoundedWindow window) {
/** Writes the accumulated rows into BigQuery with streaming API. */
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
- List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
+ List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
BigQueryOptions options =
context.getPipelineOptions().as(BigQueryOptions.class);
for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry :
tableRows.entrySet()) {
TableReference tableReference =
BigQueryHelpers.parseTableSpec(entry.getKey());
@@ -101,7 +104,7 @@ public void finishBundle(FinishBundleContext context)
throws Exception {
tableRows.clear();
uniqueIdsForTableRows.clear();
- for (ValueInSingleWindow<TableRow> row : failedInserts) {
+ for (ValueInSingleWindow<ErrorT> row : failedInserts) {
context.output(failedOutputTag, row.getValue(), row.getTimestamp(),
row.getWindow());
}
}
@@ -112,14 +115,20 @@ private void flushRows(
List<ValueInSingleWindow<TableRow>> tableRows,
List<String> uniqueIds,
BigQueryOptions options,
- List<ValueInSingleWindow<TableRow>> failedInserts)
+ List<ValueInSingleWindow<ErrorT>> failedInserts)
throws InterruptedException {
if (!tableRows.isEmpty()) {
try {
long totalBytes =
bqServices
.getDatasetService(options)
- .insertAll(tableReference, tableRows, uniqueIds, retryPolicy,
failedInserts);
+ .insertAll(
+ tableReference,
+ tableRows,
+ uniqueIds,
+ retryPolicy,
+ failedInserts,
+ errorContainer);
byteCounter.inc(totalBytes);
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index c460c4120c8..8767f97b7e6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -46,26 +47,60 @@
extends PTransform<PCollection<KV<TableDestination, TableRow>>,
WriteResult> {
private BigQueryServices bigQueryServices;
private InsertRetryPolicy retryPolicy;
+ private boolean extendedErrorInfo;
+ private static final String FAILED_INSERTS_TAG_ID = "failedInserts";
public StreamingWriteTables() {
- this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry());
+ this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry(), false);
}
- private StreamingWriteTables(BigQueryServices bigQueryServices,
InsertRetryPolicy retryPolicy) {
+ private StreamingWriteTables(
+ BigQueryServices bigQueryServices, InsertRetryPolicy retryPolicy,
boolean extendedErrorInfo) {
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
+ this.extendedErrorInfo = extendedErrorInfo;
}
StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) {
- return new StreamingWriteTables(bigQueryServices, retryPolicy);
+ return new StreamingWriteTables(bigQueryServices, retryPolicy,
extendedErrorInfo);
}
StreamingWriteTables withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
- return new StreamingWriteTables(bigQueryServices, retryPolicy);
+ return new StreamingWriteTables(bigQueryServices, retryPolicy,
extendedErrorInfo);
+ }
+
+ StreamingWriteTables withExtendedErrorInfo(boolean extendedErrorInfo) {
+ return new StreamingWriteTables(bigQueryServices, retryPolicy,
extendedErrorInfo);
}
@Override
public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input)
{
+ if (extendedErrorInfo) {
+ TupleTag<BigQueryInsertError> failedInsertsTag = new
TupleTag<>(FAILED_INSERTS_TAG_ID);
+ PCollection<BigQueryInsertError> failedInserts =
+ writeAndGetErrors(
+ input,
+ failedInsertsTag,
+ BigQueryInsertErrorCoder.of(),
+ ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER);
+ return WriteResult.withExtendedErrors(input.getPipeline(),
failedInsertsTag, failedInserts);
+ } else {
+ TupleTag<TableRow> failedInsertsTag = new
TupleTag<>(FAILED_INSERTS_TAG_ID);
+ PCollection<TableRow> failedInserts =
+ writeAndGetErrors(
+ input,
+ failedInsertsTag,
+ TableRowJsonCoder.of(),
+ ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
+ return WriteResult.in(input.getPipeline(), failedInsertsTag,
failedInserts);
+ }
+ }
+
+ private <T> PCollection<T> writeAndGetErrors(
+ PCollection<KV<TableDestination, TableRow>> input,
+ TupleTag<T> failedInsertsTag,
+ AtomicCoder<T> coder,
+ ErrorContainer<T> errorContainer) {
// A naive implementation would be to simply stream data directly to
BigQuery.
// However, this could occasionally lead to duplicated data, e.g., when
// a VM that runs this code is restarted and the code is re-run.
@@ -85,12 +120,12 @@ public WriteResult expand(PCollection<KV<TableDestination,
TableRow>> input) {
.apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()))
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()),
TableRowInfoCoder.of()));
+ TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
+
// To prevent having the same TableRow processed more than once with
regenerated
// different unique ids, this implementation relies on "checkpointing",
which is
// achieved as a side effect of having StreamingWriteFn immediately follow
a GBK,
// performed by Reshuffle.
- TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
- TupleTag<TableRow> failedInsertsTag = new TupleTag<>("failedInserts");
PCollectionTuple tuple =
tagged
.apply(Reshuffle.of())
@@ -103,10 +138,12 @@ public WriteResult
expand(PCollection<KV<TableDestination, TableRow>> input) {
.discardingFiredPanes())
.apply(
"StreamingWrite",
- ParDo.of(new StreamingWriteFn(bigQueryServices, retryPolicy,
failedInsertsTag))
+ ParDo.of(
+ new StreamingWriteFn<>(
+ bigQueryServices, retryPolicy, failedInsertsTag,
errorContainer))
.withOutputTags(mainOutputTag,
TupleTagList.of(failedInsertsTag)));
- PCollection<TableRow> failedInserts = tuple.get(failedInsertsTag);
- failedInserts.setCoder(TableRowJsonCoder.of());
- return WriteResult.in(input.getPipeline(), failedInsertsTag,
failedInserts);
+ PCollection<T> failedInserts = tuple.get(failedInsertsTag);
+ failedInserts.setCoder(coder);
+ return failedInserts;
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index 12097a618e4..72c49d12c9d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
@@ -33,29 +35,74 @@
private final Pipeline pipeline;
private final TupleTag<TableRow> failedInsertsTag;
private final PCollection<TableRow> failedInserts;
+ private final TupleTag<BigQueryInsertError> failedInsertsWithErrTag;
+ private final PCollection<BigQueryInsertError> failedInsertsWithErr;
/** Creates a {@link WriteResult} in the given {@link Pipeline}. */
static WriteResult in(
Pipeline pipeline, TupleTag<TableRow> failedInsertsTag,
PCollection<TableRow> failedInserts) {
- return new WriteResult(pipeline, failedInsertsTag, failedInserts);
+ return new WriteResult(pipeline, failedInsertsTag, failedInserts, null,
null);
+ }
+
+ static WriteResult withExtendedErrors(
+ Pipeline pipeline,
+ TupleTag<BigQueryInsertError> failedInsertsTag,
+ PCollection<BigQueryInsertError> failedInserts) {
+ return new WriteResult(pipeline, null, null, failedInsertsTag,
failedInserts);
}
@Override
public Map<TupleTag<?>, PValue> expand() {
- return ImmutableMap.of(failedInsertsTag, failedInserts);
+ if (failedInsertsTag != null) {
+ return ImmutableMap.of(failedInsertsTag, failedInserts);
+ } else {
+ return ImmutableMap.of(failedInsertsWithErrTag, failedInsertsWithErr);
+ }
}
private WriteResult(
- Pipeline pipeline, TupleTag<TableRow> failedInsertsTag,
PCollection<TableRow> failedInserts) {
+ Pipeline pipeline,
+ TupleTag<TableRow> failedInsertsTag,
+ PCollection<TableRow> failedInserts,
+ TupleTag<BigQueryInsertError> failedInsertsWithErrTag,
+ PCollection<BigQueryInsertError> failedInsertsWithErr) {
this.pipeline = pipeline;
this.failedInsertsTag = failedInsertsTag;
this.failedInserts = failedInserts;
+ this.failedInsertsWithErrTag = failedInsertsWithErrTag;
+ this.failedInsertsWithErr = failedInsertsWithErr;
}
+ /**
+ * Returns a {@link PCollection} containing the {@link TableRow}s that
didn't made it to BQ.
+ *
+ * <p>Only use this method if you haven't enabled {@link
+ * BigQueryIO.Write#withExtendedErrorInfo()}. Otherwise use {@link
+ * WriteResult#getFailedInsertsWithErr()}
+ */
public PCollection<TableRow> getFailedInserts() {
+ checkArgument(
+ failedInsertsTag != null,
+ "Cannot use getFailedInserts as this WriteResult uses extended errors"
+ + " information. Use getFailedInsertsWithErr instead");
return failedInserts;
}
+ /**
+ * Returns a {@link PCollection} containing the {@link BigQueryInsertError}s
with detailed error
+ * information.
+ *
+ * <p>Only use this method if you have enabled {@link
BigQueryIO.Write#withExtendedErrorInfo()}. *
+ * Otherwise use {@link WriteResult#getFailedInserts()}
+ */
+ public PCollection<BigQueryInsertError> getFailedInsertsWithErr() {
+ checkArgument(
+ failedInsertsWithErrTag != null,
+ "Cannot use getFailedInsertsWithErr as this WriteResult does not use"
+ + " extended errors. Use getFailedInserts instead");
+ return failedInsertsWithErr;
+ }
+
@Override
public Pipeline getPipeline() {
return pipeline;
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index df3eb383d8a..39e81e4cff0 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -26,11 +26,13 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Table;
@@ -1368,4 +1370,103 @@ public void testWriteToTableDecorator() throws
Exception {
.withoutValidation());
p.run();
}
+
+ @Test
+ public void testExtendedErrorRetrieval() throws Exception {
+ TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+ TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+ TableRow row3 = new TableRow().set("name", "c").set("number", "3");
+ String tableSpec = "project-id:dataset-id.table-id";
+
+ TableDataInsertAllResponse.InsertErrors ephemeralError =
+ new TableDataInsertAllResponse.InsertErrors()
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("timeout")));
+ TableDataInsertAllResponse.InsertErrors persistentError =
+ new TableDataInsertAllResponse.InsertErrors()
+ .setErrors(Lists.newArrayList(new
ErrorProto().setReason("invalidQuery")));
+
+ fakeDatasetService.failOnInsert(
+ ImmutableMap.of(
+ row1, ImmutableList.of(ephemeralError, ephemeralError),
+ row2, ImmutableList.of(ephemeralError, ephemeralError,
persistentError)));
+
+ PCollection<BigQueryInsertError> failedRows =
+ p.apply(Create.of(row1, row2, row3))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(tableSpec)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+ .withTestServices(fakeBqServices)
+ .withoutValidation()
+ .withExtendedErrorInfo())
+ .getFailedInsertsWithErr();
+
+ // row2 finally fails with a non-retryable error, so we expect to see it
in the collection of
+ // failed rows.
+ PAssert.that(failedRows)
+ .containsInAnyOrder(
+ new BigQueryInsertError(
+ row2, persistentError,
BigQueryHelpers.parseTableSpec(tableSpec)));
+ p.run();
+
+ // Only row1 and row3 were successfully inserted.
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(row1, row3));
+ }
+
+ @Test
+ public void testWrongErrorConfigs() {
+ p.enableAutoRunIfMissing(true);
+ TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+
+ BigQueryIO.Write<TableRow> bqIoWrite =
+ BigQueryIO.writeTableRows()
+ .to("project-id:dataset-id.table-id")
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+ .withTestServices(fakeBqServices)
+ .withoutValidation();
+
+ try {
+ p.apply("Create1", Create.<TableRow>of(row1))
+ .apply("Write 1", bqIoWrite)
+ .getFailedInsertsWithErr();
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertThat(
+ e.getMessage(),
+ is(
+ "Cannot use getFailedInsertsWithErr as this WriteResult "
+ + "does not use extended errors. Use getFailedInserts
instead"));
+ }
+
+ try {
+ p.apply("Create2", Create.<TableRow>of(row1))
+ .apply("Write2", bqIoWrite.withExtendedErrorInfo())
+ .getFailedInserts();
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertThat(
+ e.getMessage(),
+ is(
+ "Cannot use getFailedInserts as this WriteResult "
+ + "uses extended errors information. Use
getFailedInsertsWithErr instead"));
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoderTest.java
new file mode 100644
index 00000000000..7f755b390aa
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryInsertErrorCoderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test case for {@link BigQueryInsertErrorCoder}. */
+@RunWith(JUnit4.class)
+public class BigQueryInsertErrorCoderTest {
+
+ private static final Coder<BigQueryInsertError> TEST_CODER =
BigQueryInsertErrorCoder.of();
+
+ @Test
+ public void testDecodeEncodeEqual() throws Exception {
+ BigQueryInsertError value =
+ new BigQueryInsertError(
+ new TableRow().setF(Collections.singletonList(new
TableCell().setV("Value"))),
+ new TableDataInsertAllResponse.InsertErrors()
+ .setIndex(0L)
+ .setErrors(
+ Collections.singletonList(
+ new ErrorProto()
+ .setReason("a Reason")
+ .setLocation("A location")
+ .setMessage("A message")
+ .setDebugInfo("The debug info"))),
+ new TableReference()
+ .setProjectId("dummy-project-id")
+ .setDatasetId("dummy-dataset-id")
+ .setTableId("dummy-table-id"));
+
+ CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 15f54d1e0e1..731c8bf707c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -477,9 +478,9 @@ public void testExecuteWithRetries() throws IOException,
InterruptedException {
verify(response, times(1)).getContentType();
}
- private ValueInSingleWindow<TableRow> wrapTableRow(TableRow row) {
+ private <T> ValueInSingleWindow<T> wrapValue(T value) {
return ValueInSingleWindow.of(
- row,
+ value,
GlobalWindow.TIMESTAMP_MAX_VALUE,
GlobalWindow.INSTANCE,
PaneInfo.ON_TIME_AND_ONLY_FIRING);
@@ -491,7 +492,7 @@ public void testInsertRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
- rows.add(wrapTableRow(new TableRow()));
+ rows.add(wrapValue(new TableRow()));
// First response is 403 rate limited, second response has valid payload.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
@@ -509,6 +510,7 @@ public void testInsertRetry() throws Exception {
BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
new MockSleeper(),
InsertRetryPolicy.alwaysRetry(),
+ null,
null);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
@@ -529,8 +531,7 @@ public void testInsertRetrySelectRows() throws Exception {
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows =
ImmutableList.of(
- wrapTableRow(new TableRow().set("row", "a")),
- wrapTableRow(new TableRow().set("row", "b")));
+ wrapValue(new TableRow().set("row", "a")), wrapValue(new
TableRow().set("row", "b")));
List<String> insertIds = ImmutableList.of("a", "b");
final TableDataInsertAllResponse bFailed =
@@ -556,6 +557,7 @@ public void testInsertRetrySelectRows() throws Exception {
BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
new MockSleeper(),
InsertRetryPolicy.alwaysRetry(),
+ null,
null);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
@@ -568,7 +570,7 @@ public void testInsertFailsGracefully() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows =
- ImmutableList.of(wrapTableRow(new TableRow()), wrapTableRow(new
TableRow()));
+ ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
final TableDataInsertAllResponse row1Failed =
new TableDataInsertAllResponse()
@@ -598,6 +600,7 @@ public void testInsertFailsGracefully() throws Exception {
BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
new MockSleeper(),
InsertRetryPolicy.alwaysRetry(),
+ null,
null);
fail();
} catch (IOException e) {
@@ -619,7 +622,7 @@ public void testInsertDoesNotRetry() throws Throwable {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
- rows.add(wrapTableRow(new TableRow()));
+ rows.add(wrapValue(new TableRow()));
// First response is 403 not-rate-limited, second response has valid
payload but should not
// be invoked.
@@ -643,6 +646,7 @@ public void testInsertDoesNotRetry() throws Throwable {
BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
new MockSleeper(),
InsertRetryPolicy.alwaysRetry(),
+ null,
null);
fail();
} catch (RuntimeException e) {
@@ -662,7 +666,7 @@ public void testInsertRetryPolicy() throws
InterruptedException, IOException {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows =
- ImmutableList.of(wrapTableRow(new TableRow()), wrapTableRow(new
TableRow()));
+ ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
// First time row0 fails with a retryable error, and row1 fails with a
persistent error.
final TableDataInsertAllResponse firstFailure =
@@ -711,7 +715,8 @@ public void testInsertRetryPolicy() throws
InterruptedException, IOException {
BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
new MockSleeper(),
InsertRetryPolicy.retryTransientErrors(),
- failedInserts);
+ failedInserts,
+ ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
assertEquals(1, failedInserts.size());
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}
@@ -843,4 +848,99 @@ public void testCreateTableRetry() throws IOException {
"Quota limit reached when creating table project:dataset.table, "
+ "retrying up to 5.0 minutes");
}
+
+ /** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link
ErrorContainer}. */
+ @Test
+ public void testSimpleErrorRetrieval() throws InterruptedException,
IOException {
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<ValueInSingleWindow<TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("a", 1)), wrapValue(new
TableRow().set("b", 2)));
+
+ final TableDataInsertAllResponse failures =
+ new TableDataInsertAllResponse()
+ .setInsertErrors(
+ ImmutableList.of(
+ new InsertErrors()
+ .setIndex(0L)
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("timeout"))),
+ new InsertErrors()
+ .setIndex(1L)
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("invalid")))));
+
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+
+ when(response.getContent()).thenReturn(toStream(failures));
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
+ dataService.insertAll(
+ ref,
+ rows,
+ null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ new MockSleeper(),
+ InsertRetryPolicy.neverRetry(),
+ failedInserts,
+ ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
+
+ assertThat(failedInserts, is(rows));
+ }
+
+ /** Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link
ErrorContainer}. */
+ @Test
+ public void testExtendedErrorRetrieval() throws InterruptedException,
IOException {
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<ValueInSingleWindow<TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("a", 1)), wrapValue(new
TableRow().set("b", 2)));
+
+ final TableDataInsertAllResponse failures =
+ new TableDataInsertAllResponse()
+ .setInsertErrors(
+ ImmutableList.of(
+ new InsertErrors()
+ .setIndex(0L)
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("timeout"))),
+ new InsertErrors()
+ .setIndex(1L)
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("invalid")))));
+
+ final List<ValueInSingleWindow<BigQueryInsertError>> expected =
+ ImmutableList.of(
+ wrapValue(
+ new BigQueryInsertError(
+ rows.get(0).getValue(), failures.getInsertErrors().get(0),
ref)),
+ wrapValue(
+ new BigQueryInsertError(
+ rows.get(1).getValue(), failures.getInsertErrors().get(1),
ref)));
+
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+
+ when(response.getContent()).thenReturn(toStream(failures));
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ List<ValueInSingleWindow<BigQueryInsertError>> failedInserts =
Lists.newArrayList();
+ dataService.insertAll(
+ ref,
+ rows,
+ null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ new MockSleeper(),
+ InsertRetryPolicy.neverRetry(),
+ failedInserts,
+ ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER);
+
+ assertThat(failedInserts, is(expected));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 23c9509ac0e..48c92064d01 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -203,7 +203,8 @@ public void testInsertAll() throws Exception {
long totalBytes = 0;
try {
- totalBytes = datasetService.insertAll(ref, rows, ids,
InsertRetryPolicy.alwaysRetry(), null);
+ totalBytes =
+ datasetService.insertAll(ref, rows, ids,
InsertRetryPolicy.alwaysRetry(), null, null);
} finally {
verifyInsertAll(5);
// Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index d57e8667dda..33a24e7a06b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -39,7 +39,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -47,8 +46,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
/** A fake dataset service that can be serialized, for use in
testReadFromTable. */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class FakeDatasetService implements DatasetService, Serializable {
+class FakeDatasetService implements DatasetService, Serializable {
// Table information must be static, as each ParDo will get a separate
instance of
// FakeDatasetServices, and they must all modify the same storage.
static com.google.common.collect.Table<String, String, Map<String,
TableContainer>> tables;
@@ -75,7 +73,7 @@ public Table getTable(TableReference tableRef) throws
InterruptedException, IOEx
}
}
- public List<TableRow> getAllRows(String projectId, String datasetId, String
tableId)
+ List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
throws InterruptedException, IOException {
synchronized (tables) {
return getTableContainer(projectId, datasetId, tableId).getRows();
@@ -206,16 +204,17 @@ public long insertAll(
GlobalWindow.INSTANCE,
PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
- return insertAll(ref, windowedRows, insertIdList,
InsertRetryPolicy.alwaysRetry(), null);
+ return insertAll(ref, windowedRows, insertIdList,
InsertRetryPolicy.alwaysRetry(), null, null);
}
@Override
- public long insertAll(
+ public <T> long insertAll(
TableReference ref,
List<ValueInSingleWindow<TableRow>> rowList,
@Nullable List<String> insertIdList,
InsertRetryPolicy retryPolicy,
- List<ValueInSingleWindow<TableRow>> failedInserts)
+ List<ValueInSingleWindow<T>> failedInserts,
+ ErrorContainer<T> errorContainer)
throws IOException, InterruptedException {
Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> insertErrors
= getInsertErrors();
synchronized (tables) {
@@ -248,7 +247,8 @@ public long insertAll(
if (shouldInsert) {
dataSize += tableContainer.addRow(row, insertIdList.get(i));
} else {
- failedInserts.add(rowList.get(i));
+ errorContainer.add(
+ failedInserts, allErrors.get(allErrors.size() - 1), ref,
rowList.get(i));
}
}
return dataSize;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 125793)
Time Spent: 4.5h (was: 4h 20m)
> Add error reason and table destination to BigQueryIO streaming failed inserts
> -----------------------------------------------------------------------------
>
> Key: BEAM-4257
> URL: https://issues.apache.org/jira/browse/BEAM-4257
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Carlos Alonso
> Assignee: Carlos Alonso
> Priority: Minor
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> When using `BigQueryIO.Write` and getting `WriteResult.getFailedInserts()` we
> get a `PCollection<TableRow>` which is fine, but in order to properly work on
> the errors downstream having extended information such as the `InsertError`
> fields and the `TableReference` it was routed to would be really valuable.
>
> My suggestion is to create a new object that contains all that information
> and return a `PCollection` of those instead.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)