Repository: beam Updated Branches: refs/heads/master eee6726aa -> 53c9bf4cd
Dead-letter support for BigQuery. Allow users to specify a retry policy, and get failed inserts back. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a57ff0ee Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a57ff0ee Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a57ff0ee Branch: refs/heads/master Commit: a57ff0eef1b7166d02243e19c4632f8924a014f0 Parents: eee6726 Author: Reuven Lax <[email protected]> Authored: Thu May 11 06:40:47 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jun 2 13:15:01 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 8 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 22 ++++- .../sdk/io/gcp/bigquery/BigQueryServices.java | 8 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 39 +++++---- .../sdk/io/gcp/bigquery/InsertRetryPolicy.java | 86 +++++++++++++++++++ .../sdk/io/gcp/bigquery/StreamingInserts.java | 35 ++++++-- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 50 ++++++++--- .../io/gcp/bigquery/StreamingWriteTables.java | 26 ++++-- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 27 ++++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 56 +++++++++++++ .../gcp/bigquery/BigQueryServicesImplTest.java | 88 +++++++++++++++++--- .../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 11 ++- .../sdk/io/gcp/bigquery/FakeDatasetService.java | 75 ++++++++++++++++- .../io/gcp/bigquery/InsertRetryPolicyTest.java | 79 ++++++++++++++++++ 14 files changed, 540 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 0abd469..3686f99 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ class BatchLoads<DestinationT> @@ -248,7 +249,8 @@ class BatchLoads<DestinationT> // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. - PCollection<Void> singleton = p.apply(Create.of((Void) null).withCoder(VoidCoder.of())); + PCollection<Void> singleton = p.apply("singleton", + Create.of((Void) null).withCoder(VoidCoder.of())); PCollectionTuple partitions = singleton.apply( "WritePartition", @@ -333,6 +335,8 @@ class BatchLoads<DestinationT> dynamicDestinations)) .withSideInputs(writeTablesSideInputs)); - return WriteResult.in(input.getPipeline()); + PCollection<TableRow> empty = + p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class))); + return WriteResult.in(input.getPipeline(), new TupleTag<TableRow>("failedInserts"), empty); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index cf258ca..6a93279 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -674,6 +674,7 @@ public class BigQueryIO { abstract BigQueryServices getBigQueryServices(); @Nullable abstract Integer getMaxFilesPerBundle(); @Nullable abstract Long getMaxFileSize(); + @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy(); abstract Builder<T> toBuilder(); @@ -693,6 +694,7 @@ public class BigQueryIO { abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices); abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle); abstract Builder<T> setMaxFileSize(Long maxFileSize); + abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy); abstract Write<T> build(); } @@ -861,6 +863,17 @@ public class BigQueryIO { return toBuilder().setTableDescription(tableDescription).build(); } + /** Specfies a policy for handling failed inserts. + * + * <p>Currently this only is allowed when writing an unbounded collection to BigQuery. Bounded + * collections are written using batch load jobs, so we don't get per-element failures. + * Unbounded collections are written using streaming inserts, so we have access to per-element + * insert results. + */ + public Write<T> withFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy) { + return toBuilder().setFailedInsertRetryPolicy(retryPolicy).build(); + } + /** Disables BigQuery table validation. */ public Write<T> withoutValidation() { return toBuilder().setValidate(false).build(); @@ -935,6 +948,7 @@ public class BigQueryIO { "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set"); + DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { if (getJsonTableRef() != null) { @@ -981,10 +995,14 @@ public class BigQueryIO { "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" + " PCollection."); StreamingInserts<DestinationT> streamingInserts = - new StreamingInserts<>(getCreateDisposition(), dynamicDestinations); - streamingInserts.setTestServices(getBigQueryServices()); + new StreamingInserts<>(getCreateDisposition(), dynamicDestinations) + .withInsertRetryPolicy(getFailedInsertRetryPolicy()) + .withTestServices((getBigQueryServices())); return rowsWithDestination.apply(streamingInserts); } else { + checkArgument(getFailedInsertRetryPolicy() == null, + "Record-insert retry policies are not supported when using BigQuery load jobs."); + BatchLoads<DestinationT> batchLoads = new BatchLoads<>( getWriteDisposition(), getCreateDisposition(), http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 1ae10bc..c067229 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -33,6 +33,7 @@ import java.io.Serializable; import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.values.ValueInSingleWindow; /** An interface for real, mock, or fake implementations of Cloud BigQuery services. */ interface BigQueryServices extends Serializable { @@ -161,9 +162,14 @@ interface BigQueryServices extends Serializable { /** * Inserts {@link TableRow TableRows} with the specified insertIds if not null. * + * <p>If any insert fail permanently according to the retry policy, those rows are added + * to failedInserts. + * * <p>Returns the total bytes count of {@link TableRow TableRows}. */ - long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, + @Nullable List<String> insertIdList, InsertRetryPolicy retryPolicy, + List<ValueInSingleWindow<TableRow>> failedInserts) throws IOException, InterruptedException; /** Patch BigQuery {@link Table} description. */ http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 5d5a519..b14405e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -656,8 +657,11 @@ class BigQueryServicesImpl implements BigQueryServices { } @VisibleForTesting - long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList, - BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException { + long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, + @Nullable List<String> insertIdList, + BackOff backoff, final Sleeper sleeper, InsertRetryPolicy retryPolicy, + List<ValueInSingleWindow<TableRow>> failedInserts) + throws IOException, InterruptedException { checkNotNull(ref, "ref"); if (executor == null) { this.executor = options.as(GcsOptions.class).getExecutorService(); @@ -671,10 +675,10 @@ class BigQueryServicesImpl implements BigQueryServices { List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>(); // These lists contain the rows to publish. Initially the contain the entire list. // If there are failures, they will contain only the failed rows to be retried. - List<TableRow> rowsToPublish = rowList; + List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList; List<String> idsToPublish = insertIdList; while (true) { - List<TableRow> retryRows = new ArrayList<>(); + List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>(); List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null; int strideIndex = 0; @@ -686,7 +690,7 @@ class BigQueryServicesImpl implements BigQueryServices { List<Integer> strideIndices = new ArrayList<>(); for (int i = 0; i < rowsToPublish.size(); ++i) { - TableRow row = rowsToPublish.get(i); + TableRow row = rowsToPublish.get(i).getValue(); TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows(); if (idsToPublish != null) { out.setInsertId(idsToPublish.get(i)); @@ -743,18 +747,23 @@ class BigQueryServicesImpl implements BigQueryServices { try { for (int i = 0; i < futures.size(); i++) { List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get(); - if (errors != null) { - for (TableDataInsertAllResponse.InsertErrors error : errors) { - allErrors.add(error); - if (error.getIndex() == null) { - throw new IOException("Insert failed: " + allErrors); - } + if (errors == null) { + continue; + } + for (TableDataInsertAllResponse.InsertErrors error : errors) { + if (error.getIndex() == null) { + throw new IOException("Insert failed: " + error + ", other errors: " + allErrors); + } - int errorIndex = error.getIndex().intValue() + strideIndices.get(i); + int errorIndex = error.getIndex().intValue() + strideIndices.get(i); + if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) { + allErrors.add(error); retryRows.add(rowsToPublish.get(errorIndex)); if (retryIds != null) { retryIds.add(idsToPublish.get(errorIndex)); } + } else { + failedInserts.add(rowsToPublish.get(errorIndex)); } } } @@ -793,13 +802,15 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public long insertAll( - TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, + @Nullable List<String> insertIdList, + InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<TableRow>> failedInserts) throws IOException, InterruptedException { return insertAll( ref, rowList, insertIdList, BackOffAdapter.toGcpBackOff( INSERT_BACKOFF_FACTORY.backoff()), - Sleeper.DEFAULT); + Sleeper.DEFAULT, retryPolicy, failedInserts); } http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java new file mode 100644 index 0000000..90a3d0d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.common.collect.ImmutableSet; +import java.io.Serializable; +import java.util.Set; + +/** A retry policy for streaming BigQuery inserts. */ +public abstract class InsertRetryPolicy implements Serializable { + /** + * Contains information about a failed insert. + * + * <p>Currently only the list of errors returned from BigQuery. In the future this may contain + * more information - e.g. how many times this insert has been retried, and for how long. + */ + public static class Context { + // A list of all errors corresponding to an attempted insert of a single record. + TableDataInsertAllResponse.InsertErrors errors; + + public Context(TableDataInsertAllResponse.InsertErrors errors) { + this.errors = errors; + } + } + + // A list of known persistent errors for which retrying never helps. + static final Set<String> PERSISTENT_ERRORS = + ImmutableSet.of("invalid", "invalidQuery", "notImplemented"); + + /** Return true if this failure should be retried. */ + public abstract boolean shouldRetry(Context context); + + /** Never retry any failures. */ + public static InsertRetryPolicy neverRetry() { + return new InsertRetryPolicy() { + @Override + public boolean shouldRetry(Context context) { + return false; + } + }; + } + + /** Always retry all failures. */ + public static InsertRetryPolicy alwaysRetry() { + return new InsertRetryPolicy() { + @Override + public boolean shouldRetry(Context context) { + return true; + } + }; + } + + /** Retry all failures except for known persistent errors. */ + public static InsertRetryPolicy retryTransientErrors() { + return new InsertRetryPolicy() { + @Override + public boolean shouldRetry(Context context) { + if (context.errors.getErrors() != null) { + for (ErrorProto error : context.errors.getErrors()) { + if (error.getReason() != null && PERSISTENT_ERRORS.contains(error.getReason())) { + return false; + } + } + } + return true; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java index 9cb0027..ba09cb3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -35,19 +35,39 @@ public class StreamingInserts<DestinationT> private BigQueryServices bigQueryServices; private final CreateDisposition createDisposition; private final DynamicDestinations<?, DestinationT> dynamicDestinations; + private InsertRetryPolicy retryPolicy; /** Constructor. */ - StreamingInserts(CreateDisposition createDisposition, + public StreamingInserts(CreateDisposition createDisposition, DynamicDestinations<?, DestinationT> dynamicDestinations) { + this(createDisposition, dynamicDestinations, new BigQueryServicesImpl(), + InsertRetryPolicy.alwaysRetry()); + } + + /** Constructor. */ + private StreamingInserts(CreateDisposition createDisposition, + DynamicDestinations<?, DestinationT> dynamicDestinations, + BigQueryServices bigQueryServices, + InsertRetryPolicy retryPolicy) { this.createDisposition = createDisposition; this.dynamicDestinations = dynamicDestinations; - this.bigQueryServices = new BigQueryServicesImpl(); + this.bigQueryServices = bigQueryServices; + this.retryPolicy = retryPolicy; } - void setTestServices(BigQueryServices bigQueryServices) { - this.bigQueryServices = bigQueryServices; + /** + * Specify a retry policy for failed inserts. + */ + public StreamingInserts<DestinationT> withInsertRetryPolicy(InsertRetryPolicy retryPolicy) { + return new StreamingInserts<>( + createDisposition, dynamicDestinations, bigQueryServices, retryPolicy); } + StreamingInserts<DestinationT> withTestServices(BigQueryServices bigQueryServices) { + return new StreamingInserts<>( + createDisposition, dynamicDestinations, bigQueryServices, retryPolicy); } + + @Override protected Coder<Void> getDefaultOutputCoder() { return VoidCoder.of(); @@ -58,9 +78,12 @@ public class StreamingInserts<DestinationT> PCollection<KV<TableDestination, TableRow>> writes = input.apply( "CreateTables", - new CreateTables<DestinationT>(createDisposition, dynamicDestinations) + new CreateTables<>(createDisposition, dynamicDestinations) .withTestServices(bigQueryServices)); - return writes.apply(new StreamingWriteTables().withTestServices(bigQueryServices)); + return writes.apply( + new StreamingWriteTables() + .withTestServices(bigQueryServices) + .withInsertRetryPolicy(retryPolicy)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index f267976..63e5bc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -29,8 +30,11 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueInSingleWindow; /** * Implementation of DoFn to perform streaming BigQuery write. @@ -40,9 +44,12 @@ import org.apache.beam.sdk.values.KV; class StreamingWriteFn extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { private final BigQueryServices bqServices; + private final InsertRetryPolicy retryPolicy; + private final TupleTag<TableRow> failedOutputTag; + /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ - private transient Map<String, List<TableRow>> tableRows; + private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows; /** The list of unique ids for each BigQuery table row. */ private transient Map<String, List<String>> uniqueIdsForTableRows; @@ -50,8 +57,11 @@ class StreamingWriteFn /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = SinkMetrics.bytesWritten(); - StreamingWriteFn(BigQueryServices bqServices) { + StreamingWriteFn(BigQueryServices bqServices, InsertRetryPolicy retryPolicy, + TupleTag<TableRow> failedOutputTag) { this.bqServices = bqServices; + this.retryPolicy = retryPolicy; + this.failedOutputTag = failedOutputTag; } /** Prepares a target BigQuery table. */ @@ -63,27 +73,39 @@ class StreamingWriteFn /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ @ProcessElement - public void processElement(ProcessContext context) { + public void processElement(ProcessContext context, BoundedWindow window) { String tableSpec = context.element().getKey().getKey(); - List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); - List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, - tableSpec); + List<ValueInSingleWindow<TableRow>> rows = + BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); + List<String> uniqueIds = + BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec); - rows.add(context.element().getValue().tableRow); + rows.add( + ValueInSingleWindow.of( + context.element().getValue().tableRow, context.timestamp(), window, context.pane())); uniqueIds.add(context.element().getValue().uniqueId); } /** Writes the accumulated rows into BigQuery with streaming API. */ @FinishBundle public void finishBundle(FinishBundleContext context) throws Exception { + List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList(); BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) { + for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry : tableRows.entrySet()) { TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); - flushRows(tableReference, entry.getValue(), - uniqueIdsForTableRows.get(entry.getKey()), options); + flushRows( + tableReference, + entry.getValue(), + uniqueIdsForTableRows.get(entry.getKey()), + options, + failedInserts); } tableRows.clear(); uniqueIdsForTableRows.clear(); + + for (ValueInSingleWindow<TableRow> row : failedInserts) { + context.output(failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow()); + } } @Override @@ -95,12 +117,14 @@ class StreamingWriteFn * Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows(TableReference tableReference, - List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) - throws InterruptedException { + List<ValueInSingleWindow<TableRow>> tableRows, + List<String> uniqueIds, BigQueryOptions options, + List<ValueInSingleWindow<TableRow>> failedInserts) + throws InterruptedException { if (!tableRows.isEmpty()) { try { long totalBytes = bqServices.getDatasetService(options).insertAll( - tableReference, tableRows, uniqueIds); + tableReference, tableRows, uniqueIds, retryPolicy, failedInserts); byteCounter.inc(totalBytes); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 886236b..18b2033 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -28,6 +28,9 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; /** * This transform takes in key-value pairs of {@link TableRow} entries and the @@ -40,17 +43,23 @@ import org.apache.beam.sdk.values.PCollection; public class StreamingWriteTables extends PTransform< PCollection<KV<TableDestination, TableRow>>, WriteResult> { private BigQueryServices bigQueryServices; + private InsertRetryPolicy retryPolicy; public StreamingWriteTables() { - this(new BigQueryServicesImpl()); + this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry()); } - private StreamingWriteTables(BigQueryServices bigQueryServices) { + private StreamingWriteTables(BigQueryServices bigQueryServices, InsertRetryPolicy retryPolicy) { this.bigQueryServices = bigQueryServices; + this.retryPolicy = retryPolicy; } StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) { - return new StreamingWriteTables(bigQueryServices); + return new StreamingWriteTables(bigQueryServices, retryPolicy); + } + + StreamingWriteTables withInsertRetryPolicy(InsertRetryPolicy retryPolicy) { + return new StreamingWriteTables(bigQueryServices, retryPolicy); } @Override @@ -77,7 +86,9 @@ public class StreamingWriteTables extends PTransform< // different unique ids, this implementation relies on "checkpointing", which is // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, // performed by Reshuffle. - tagged + TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput"); + TupleTag<TableRow> failedInsertsTag = new TupleTag<>("failedInserts"); + PCollectionTuple tuple = tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) // Put in the global window to ensure that DynamicDestinations side inputs are accessed @@ -87,7 +98,10 @@ public class StreamingWriteTables extends PTransform< .triggering(DefaultTrigger.of()).discardingFiredPanes()) .apply("StreamingWrite", ParDo.of( - new StreamingWriteFn(bigQueryServices))); - return WriteResult.in(input.getPipeline()); + new StreamingWriteFn(bigQueryServices, retryPolicy, failedInsertsTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag))); + PCollection<TableRow> failedInserts = tuple.get(failedInsertsTag); + failedInserts.setCoder(TableRowJsonCoder.of()); + return WriteResult.in(input.getPipeline(), failedInsertsTag, failedInserts); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java index db0be3a..4f6b23e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -17,10 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import java.util.Collections; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -30,23 +32,30 @@ import org.apache.beam.sdk.values.TupleTag; * The result of a {@link BigQueryIO.Write} transform. */ public final class WriteResult implements POutput { - private final Pipeline pipeline; + private final TupleTag<TableRow> failedInsertsTag; + private final PCollection<TableRow> failedInserts; - /** - * Creates a {@link WriteResult} in the given {@link Pipeline}. - */ - static WriteResult in(Pipeline pipeline) { - return new WriteResult(pipeline); + /** Creates a {@link WriteResult} in the given {@link Pipeline}. */ + static WriteResult in( + Pipeline pipeline, TupleTag<TableRow> failedInsertsTag, PCollection<TableRow> failedInserts) { + return new WriteResult(pipeline, failedInsertsTag, failedInserts); } @Override public Map<TupleTag<?>, PValue> expand() { - return Collections.emptyMap(); + return ImmutableMap.<TupleTag<?>, PValue>of(failedInsertsTag, failedInserts); } - private WriteResult(Pipeline pipeline) { + private WriteResult( + Pipeline pipeline, TupleTag<TableRow> failedInsertsTag, PCollection<TableRow> failedInserts) { this.pipeline = pipeline; + this.failedInsertsTag = failedInsertsTag; + this.failedInserts = failedInserts; + } + + public PCollection<TableRow> getFailedInserts() { + return failedInserts; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 5408fd4..04bbac4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -35,12 +35,14 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.api.client.util.Data; +import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatistics4; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -126,6 +128,7 @@ import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TupleTag; @@ -602,6 +605,59 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testRetryPolicy() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + + datasetService.createDataset("project-id", "dataset-id", "", ""); + + TableRow row1 = new TableRow().set("name", "a").set("number", "1"); + TableRow row2 = new TableRow().set("name", "b").set("number", "2"); + TableRow row3 = new TableRow().set("name", "c").set("number", "3"); + + TableDataInsertAllResponse.InsertErrors ephemeralError = + new TableDataInsertAllResponse.InsertErrors().setErrors( + ImmutableList.of(new ErrorProto().setReason("timeout"))); + TableDataInsertAllResponse.InsertErrors persistentError = + new TableDataInsertAllResponse.InsertErrors().setErrors( + ImmutableList.of(new ErrorProto().setReason("invalidQuery"))); + + datasetService.failOnInsert( + ImmutableMap.<TableRow, List<TableDataInsertAllResponse.InsertErrors>>of( + row1, ImmutableList.of(ephemeralError, ephemeralError), + row2, ImmutableList.of(ephemeralError, ephemeralError, persistentError))); + + Pipeline p = TestPipeline.create(bqOptions); + PCollection<TableRow> failedRows = + p.apply(Create.of(row1, row2, row3)) + .setIsBoundedInternal(IsBounded.UNBOUNDED) + .apply(BigQueryIO.writeTableRows().to("project-id:dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) + .withTestServices(fakeBqServices) + .withoutValidation()).getFailedInserts(); + // row2 finally fails with a non-retryable error, so we expect to see it in the collection of + // failed rows. + PAssert.that(failedRows).containsInAnyOrder(row2); + p.run(); + + // Only row1 and row3 were successfully inserted. + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder(row1, row3)); + + } + + @Test public void testWrite() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index b41490f..f602038 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -58,6 +58,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.RetryBoundedBackOff; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -67,11 +68,14 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceIm import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; @@ -485,6 +489,11 @@ public class BigQueryServicesImplTest { verify(response, times(1)).getContentType(); } + private ValueInSingleWindow<TableRow> wrapTableRow(TableRow row) { + return ValueInSingleWindow.of(row, GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING); + } + /** * Tests that {@link DatasetServiceImpl#insertAll} retries quota rate limited attempts. */ @@ -492,8 +501,8 @@ public class BigQueryServicesImplTest { public void testInsertRetry() throws Exception { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - List<TableRow> rows = new ArrayList<>(); - rows.add(new TableRow()); + List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); + rows.add(wrapTableRow(new TableRow())); // First response is 403 rate limited, second response has valid payload. when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); @@ -505,7 +514,8 @@ public class BigQueryServicesImplTest { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll(ref, rows, null, - BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), null); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); @@ -524,8 +534,9 @@ public class BigQueryServicesImplTest { public void testInsertRetrySelectRows() throws Exception { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - List<TableRow> rows = ImmutableList.of( - new TableRow().set("row", "a"), new TableRow().set("row", "b")); + List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of( + wrapTableRow(new TableRow().set("row", "a")), + wrapTableRow(new TableRow().set("row", "b"))); List<String> insertIds = ImmutableList.of("a", "b"); final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse() @@ -542,11 +553,11 @@ public class BigQueryServicesImplTest { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll(ref, rows, insertIds, - BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), null); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); - expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); } /** @@ -556,7 +567,8 @@ public class BigQueryServicesImplTest { public void testInsertFailsGracefully() throws Exception { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow()); + List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of( + wrapTableRow(new TableRow()), wrapTableRow(new TableRow())); final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse() .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L))); @@ -584,7 +596,8 @@ public class BigQueryServicesImplTest { // Expect it to fail. try { dataService.insertAll(ref, rows, null, - BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), null); fail(); } catch (IOException e) { assertThat(e, instanceOf(IOException.class)); @@ -606,8 +619,8 @@ public class BigQueryServicesImplTest { public void testInsertDoesNotRetry() throws Throwable { TableReference ref = new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - List<TableRow> rows = new ArrayList<>(); - rows.add(new TableRow()); + List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); + rows.add(wrapTableRow(new TableRow())); // First response is 403 not-rate-limited, second response has valid payload but should not // be invoked. @@ -625,7 +638,8 @@ public class BigQueryServicesImplTest { try { dataService.insertAll(ref, rows, null, - BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(), + InsertRetryPolicy.alwaysRetry(), null); fail(); } catch (RuntimeException e) { verify(response, times(1)).getStatusCode(); @@ -635,6 +649,56 @@ public class BigQueryServicesImplTest { } } + /** + * Tests that {@link DatasetServiceImpl#insertAll} uses the supplied {@link InsertRetryPolicy}, + * and returns the list of rows not retried. + */ + @Test + public void testInsertRetryPolicy() throws InterruptedException, IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List<ValueInSingleWindow<TableRow>> rows = ImmutableList.of( + wrapTableRow(new TableRow()), wrapTableRow(new TableRow())); + + // First time row0 fails with a retryable error, and row1 fails with a persistent error. + final TableDataInsertAllResponse firstFailure = new TableDataInsertAllResponse() + .setInsertErrors(ImmutableList.of( + new InsertErrors().setIndex(0L).setErrors( + ImmutableList.of(new ErrorProto().setReason("timeout"))), + new InsertErrors().setIndex(1L).setErrors( + ImmutableList.of(new ErrorProto().setReason("invalid"))))); + + // Second time there is only one row, which fails with a retryable error. + final TableDataInsertAllResponse secondFialure = new TableDataInsertAllResponse() + .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L).setErrors( + ImmutableList.of(new ErrorProto().setReason("timeout"))))); + + // On the final attempt, no failures are returned. + final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse(); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + // Always return 200. + when(response.getStatusCode()).thenReturn(200); + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200).thenReturn(200); + + // First fail + when(response.getContent()) + .thenReturn(toStream(firstFailure)) + .thenReturn(toStream(secondFialure)) + .thenReturn(toStream(allRowsSucceeded)); + + DatasetServiceImpl dataService = + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + + List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList(); + dataService.insertAll(ref, rows, null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper(), + InsertRetryPolicy.retryTransientErrors(), failedInserts); + assertEquals(1, failedInserts.size()); + expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); + } + /** A helper to wrap a {@link GenericJson} object in a content stream. */ private static InputStream toStream(GenericJson content) throws IOException { return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content)); http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java index fa84119..43290dc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java @@ -49,6 +49,9 @@ import java.util.List; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; @@ -391,16 +394,18 @@ public class BigQueryUtilTest { .parseTableSpec("project:dataset.table"); DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5); - List<TableRow> rows = new ArrayList<>(); + List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>(); List<String> ids = new ArrayList<>(); for (int i = 0; i < 25; ++i) { - rows.add(rawRow("foo", 1234)); + rows.add(ValueInSingleWindow.of(rawRow("foo", 1234), GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); ids.add(new String()); } long totalBytes = 0; try { - totalBytes = datasetService.insertAll(ref, rows, ids); + totalBytes = datasetService.insertAll(ref, rows, ids, InsertRetryPolicy.alwaysRetry(), + null); } finally { verifyInsertAll(5); // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}" http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index 5103adb..6ee5340 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -25,9 +25,11 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; @@ -36,9 +38,15 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ValueInSingleWindow; /** A fake dataset service that can be serialized, for use in testReadFromTable. */ class FakeDatasetService implements DatasetService, Serializable { + Map<String, List<String>> insertErrors = Maps.newHashMap(); + @Override public Table getTable(TableReference tableRef) throws InterruptedException, IOException { @@ -164,10 +172,24 @@ class FakeDatasetService implements DatasetService, Serializable { } } + public long insertAll(TableReference ref, List<TableRow> rowList, + @Nullable List<String> insertIdList) + throws IOException, InterruptedException { + List<ValueInSingleWindow<TableRow>> windowedRows = Lists.newArrayList(); + for (TableRow row : rowList) { + windowedRows.add(ValueInSingleWindow.of(row, GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + return insertAll(ref, windowedRows, insertIdList, InsertRetryPolicy.alwaysRetry(), null); + } + @Override public long insertAll( - TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + TableReference ref, List<ValueInSingleWindow<TableRow>> rowList, + @Nullable List<String> insertIdList, + InsertRetryPolicy retryPolicy, List<ValueInSingleWindow<TableRow>> failedInserts) throws IOException, InterruptedException { + Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> insertErrors = getInsertErrors(); synchronized (BigQueryIOTest.tables) { if (insertIdList != null) { assertEquals(rowList.size(), insertIdList.size()); @@ -182,7 +204,21 @@ class FakeDatasetService implements DatasetService, Serializable { TableContainer tableContainer = getTableContainer( ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); for (int i = 0; i < rowList.size(); ++i) { - dataSize += tableContainer.addRow(rowList.get(i), insertIdList.get(i)); + TableRow row = rowList.get(i).getValue(); + List<TableDataInsertAllResponse.InsertErrors> allErrors = insertErrors.get(row); + boolean shouldInsert = true; + if (allErrors != null) { + for (TableDataInsertAllResponse.InsertErrors errors : allErrors) { + if (!retryPolicy.shouldRetry(new Context(errors))) { + shouldInsert = false; + } + } + } + if (shouldInsert) { + dataSize += tableContainer.addRow(row, insertIdList.get(i)); + } else { + failedInserts.add(rowList.get(i)); + } } return dataSize; } @@ -200,6 +236,41 @@ class FakeDatasetService implements DatasetService, Serializable { } } + /** + * Cause a given {@link TableRow} object to fail when it's inserted. The errors link the list + * will be returned on subsequent retries, and the insert will succeed when the errors run out. + */ + public void failOnInsert( + Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> insertErrors) { + synchronized (BigQueryIOTest.tables) { + for (Map.Entry<TableRow, List<TableDataInsertAllResponse.InsertErrors>> entry + : insertErrors.entrySet()) { + List<String> errorStrings = Lists.newArrayList(); + for (TableDataInsertAllResponse.InsertErrors errors : entry.getValue()) { + errorStrings.add(BigQueryHelpers.toJsonString(errors)); + } + this.insertErrors.put(BigQueryHelpers.toJsonString(entry.getKey()), errorStrings); + } + } + } + + Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> getInsertErrors() { + Map<TableRow, List<TableDataInsertAllResponse.InsertErrors>> parsedInsertErrors = + Maps.newHashMap(); + synchronized (BigQueryIOTest.tables) { + for (Map.Entry<String, List<String>> entry : this.insertErrors.entrySet()) { + TableRow tableRow = BigQueryHelpers.fromJsonString(entry.getKey(), TableRow.class); + List<TableDataInsertAllResponse.InsertErrors> allErrors = Lists.newArrayList(); + for (String errorsString : entry.getValue()) { + allErrors.add(BigQueryHelpers.fromJsonString( + errorsString, TableDataInsertAllResponse.InsertErrors.class)); + } + parsedInsertErrors.put(tableRow, allErrors); + } + } + return parsedInsertErrors; + } + void throwNotFound(String format, Object... args) throws IOException { throw new IOException( new GoogleJsonResponseException.Builder(404, http://git-wip-us.apache.org/repos/asf/beam/blob/a57ff0ee/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java new file mode 100644 index 0000000..b19835d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicyTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link InsertRetryPolicy}. + */ +@RunWith(JUnit4.class) +public class InsertRetryPolicyTest { + @Test + public void testNeverRetry() { + assertFalse(InsertRetryPolicy.neverRetry().shouldRetry( + new Context(new TableDataInsertAllResponse.InsertErrors()))); + } + + @Test + public void testAlwaysRetry() { + assertTrue(InsertRetryPolicy.alwaysRetry().shouldRetry( + new Context(new TableDataInsertAllResponse.InsertErrors()))); + } + + @Test + public void testDontRetryPersistentErrors() { + InsertRetryPolicy policy = InsertRetryPolicy.retryTransientErrors(); + assertTrue(policy.shouldRetry(new Context(generateErrorAmongMany( + 5, "timeout", "unavailable")))); + assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany( + 5, "timeout", "invalid")))); + assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany( + 5, "timeout", "invalidQuery")))); + assertFalse(policy.shouldRetry(new Context(generateErrorAmongMany( + 5, "timeout", "notImplemented")))); + } + + private TableDataInsertAllResponse.InsertErrors generateErrorAmongMany( + int numErrors, String baseReason, String exceptionalReason) { + // The retry policies are expected to search through the entire list of ErrorProtos to determine + // whether to retry. Stick the exceptionalReason in a random position to exercise this. + List<ErrorProto> errorProtos = Lists.newArrayListWithExpectedSize(numErrors); + int exceptionalPosition = ThreadLocalRandom.current().nextInt(numErrors); + for (int i = 0; i < numErrors; ++i) { + ErrorProto error = new ErrorProto(); + error.setReason((i == exceptionalPosition) ? exceptionalReason : baseReason); + errorProtos.add(error); + } + TableDataInsertAllResponse.InsertErrors errors = new TableDataInsertAllResponse.InsertErrors(); + errors.setErrors(errorProtos); + return errors; + } +}
