Refactor BigQueryIO.Write helper transforms into separate files.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6ef0104 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6ef0104 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6ef0104 Branch: refs/heads/master Commit: d6ef0104d5b8e1076cea2a9d08ad14ba0e8e84f1 Parents: 2cc2e81 Author: Reuven Lax <[email protected]> Authored: Fri Mar 17 15:30:53 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 08:46:15 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 182 +++ .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 64 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1063 +----------------- .../io/gcp/bigquery/BigQueryQuerySource.java | 21 +- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 20 +- .../io/gcp/bigquery/BigQueryTableSource.java | 18 + .../io/gcp/bigquery/PassThroughThenCleanup.java | 18 + .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 44 + .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 87 ++ .../sdk/io/gcp/bigquery/StreamWithDeDup.java | 98 ++ .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 186 +++ .../beam/sdk/io/gcp/bigquery/TableRowInfo.java | 34 + .../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 68 ++ .../sdk/io/gcp/bigquery/TableRowWriter.java | 84 ++ .../gcp/bigquery/TagWithUniqueIdsAndTable.java | 135 +++ .../sdk/io/gcp/bigquery/TransformingSource.java | 18 + .../beam/sdk/io/gcp/bigquery/WriteBundles.java | 82 ++ .../sdk/io/gcp/bigquery/WritePartition.java | 79 ++ .../beam/sdk/io/gcp/bigquery/WriteRename.java | 180 +++ .../beam/sdk/io/gcp/bigquery/WriteTables.java | 213 ++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 39 +- 21 files changed, 1653 insertions(+), 1080 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java new file mode 100644 index 0000000..75b1cc7 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. + */ +class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> { + BigQueryIO.Write<T> write; + + BatchLoadBigQuery(BigQueryIO.Write<T> write) { + this.write = write; + } + + @Override + public PDone expand(PCollection<T> input) { + Pipeline p = input.getPipeline(); + BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); + ValueProvider<TableReference> table = write.getTableWithDefaultProject(options); + + final String stepUuid = BigQueryHelpers.randomUUIDString(); + + String tempLocation = options.getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), + stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), + e); + } + + // Create a singleton job ID token at execution time. + PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + PCollectionView<String> jobIdTokenView = p + .apply("TriggerIdCreation", Create.of("ignored")) + .apply("CreateJobId", MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return stepUuid; + } + })) + .apply(View.<String>asSingleton()); + + PCollection<T> typedInputInGlobalWindow = + input.apply( + Window.<T>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + // Avoid applying the formatFunction if it is the identity formatter. + PCollection<TableRow> inputInGlobalWindow; + if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) { + inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow; + } else { + inputInGlobalWindow = typedInputInGlobalWindow + .apply(MapElements.via(write.getFormatFunction()) + .withOutputType(new TypeDescriptor<TableRow>() { + })); + } + + // PCollection of filename, file byte size. + PCollection<KV<String, Long>> results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles(tempFilePrefix))); + + TupleTag<KV<Long, List<String>>> multiPartitionsTag = + new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<Long, List<String>>> singlePartitionTag = + new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; + + // Turn the list of files and record counts in a PCollectionView that can be used as a + // side input. + PCollectionView<Iterable<KV<String, Long>>> resultsView = results + .apply("ResultsView", View.<KV<String, Long>>asIterable()); + PCollectionTuple partitions = singleton.apply(ParDo + .of(new WritePartition( + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + + // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then + // the import needs to be split into multiple partitions, and those partitions will be + // specified in multiPartitionsTag. + PCollection<String> tempTables = partitions.get(multiPartitionsTag) + .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( + false, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + NestedValueProvider.of(table, new TableRefToJson()), + write.getJsonSchema(), + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + write.getTableDescription())) + .withSideInputs(jobIdTokenView)); + + PCollectionView<Iterable<String>> tempTablesView = tempTables + .apply("TempTablesView", View.<String>asIterable()); + singleton.apply(ParDo + .of(new WriteRename( + write.getBigQueryServices(), + jobIdTokenView, + NestedValueProvider.of(table, new TableRefToJson()), + write.getWriteDisposition(), + write.getCreateDisposition(), + tempTablesView, + write.getTableDescription())) + .withSideInputs(tempTablesView, jobIdTokenView)); + + // Write single partition to final table + partitions.get(singlePartitionTag) + .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( + true, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + NestedValueProvider.of(table, new TableRefToJson()), + write.getJsonSchema(), + write.getWriteDisposition(), + write.getCreateDisposition(), + write.getTableDescription())) + .withSideInputs(jobIdTokenView)); + + return PDone.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 9fba938..37ff124 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -1,5 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Preconditions.checkState; + import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; @@ -14,16 +34,34 @@ import java.util.UUID; import java.util.regex.Matcher; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; - /** * A set of helper functions and classes used by {@link BigQueryIO}. */ -public class BigQueryHelpers { +class BigQueryHelpers { + private static final String RESOURCE_NOT_FOUND_ERROR = + "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" + + " execution. If the %1$s is created by an earlier stage of the pipeline, this" + + " validation can be disabled using #withoutValidation."; + + private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR = + "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by" + + " an earlier stage of the pipeline, this validation can be disabled using" + + " #withoutValidation."; + + /** + * Status of a BigQuery job or request. + */ + enum Status { + SUCCEEDED, + FAILED, + UNKNOWN, + } + @Nullable /** * Return a displayable string representation for a {@link TableReference}. @@ -138,6 +176,26 @@ public class BigQueryHelpers { return UUID.randomUUID().toString().replaceAll("-", ""); } + static void verifyTableNotExistOrEmpty( + DatasetService datasetService, + TableReference tableRef) { + try { + if (datasetService.getTable(tableRef) != null) { + checkState( + datasetService.isTableEmpty(tableRef), + "BigQuery table is not empty: %s.", + toTableSpec(tableRef)); + } + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + "unable to confirm BigQuery table emptiness for table " + + toTableSpec(tableRef), e); + } + } + @VisibleForTesting static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> { http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3f2f3e8..4917083 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -18,19 +18,13 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; -import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; -import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -39,54 +33,30 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.io.CountingOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -94,41 +64,19 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.FileIOChannelFactory; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,7 +244,7 @@ public class BigQueryIO { * A formatting function that maps a TableRow to itself. This allows sending a * {@code PCollection<TableRow>} directly to BigQueryIO.Write. */ - private static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = + static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = new SerializableFunction<TableRow, TableRow>() { @Override public TableRow apply(TableRow input) { @@ -622,7 +570,7 @@ public class BigQueryIO { * * <p>If the table's project is not specified, use the executing project. */ - @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( + @Nullable ValueProvider<TableReference> getTableWithDefaultProject( BigQueryOptions bqOptions) { ValueProvider<TableReference> table = getTableProvider(); if (table == null) { @@ -753,11 +701,11 @@ public class BigQueryIO { static final long MAX_SIZE_BYTES = 11 * (1L << 40); // The maximum number of retry jobs. - private static final int MAX_RETRY_JOBS = 3; + static final int MAX_RETRY_JOBS = 3; // The maximum number of retries to poll the status of a job. // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. - private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; @Nullable abstract ValueProvider<String> getJsonTableRef(); @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableReference> @@ -974,26 +922,6 @@ public class BigQueryIO { return toBuilder().setBigQueryServices(testServices).build(); } - private static void verifyTableNotExistOrEmpty( - DatasetService datasetService, - TableReference tableRef) { - try { - if (datasetService.getTable(tableRef) != null) { - checkState( - datasetService.isTableEmpty(tableRef), - "BigQuery table is not empty: %s.", - BigQueryHelpers.toTableSpec(tableRef)); - } - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException( - "unable to confirm BigQuery table emptiness for table " - + BigQueryHelpers.toTableSpec(tableRef), e); - } - } - @Override public void validate(PCollection<T> input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); @@ -1030,7 +958,7 @@ public class BigQueryIO { verifyTablePresence(datasetService, table); } if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { - verifyTableNotExistOrEmpty(datasetService, table); + BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); } } @@ -1074,176 +1002,12 @@ public class BigQueryIO { @Override public PDone expand(PCollection<T> input) { - Pipeline p = input.getPipeline(); - BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - // When writing an Unbounded PCollection, or when a tablespec function is defined, we use // StreamWithDeDup and BigQuery's streaming import API. if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) { return input.apply(new StreamWithDeDup<T>(this)); - } - - ValueProvider<TableReference> table = getTableWithDefaultProject(options); - - final String stepUuid = BigQueryHelpers.randomUUIDString(); - - String tempLocation = options.getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), - stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), - e); - } - - // Create a singleton job ID token at execution time. - PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); - PCollectionView<String> jobIdTokenView = p - .apply("TriggerIdCreation", Create.of("ignored")) - .apply("CreateJobId", MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return stepUuid; - } - })) - .apply(View.<String>asSingleton()); - - PCollection<T> typedInputInGlobalWindow = - input.apply( - Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - // Avoid applying the formatFunction if it is the identity formatter. - PCollection<TableRow> inputInGlobalWindow; - if (getFormatFunction() == IDENTITY_FORMATTER) { - inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow; } else { - inputInGlobalWindow = typedInputInGlobalWindow - .apply(MapElements.via(getFormatFunction()) - .withOutputType(new TypeDescriptor<TableRow>() { - })); - } - - // PCollection of filename, file byte size. - PCollection<KV<String, Long>> results = inputInGlobalWindow - .apply("WriteBundles", - ParDo.of(new WriteBundles(tempFilePrefix))); - - TupleTag<KV<Long, List<String>>> multiPartitionsTag = - new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<Long, List<String>>> singlePartitionTag = - new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; - - // Turn the list of files and record counts in a PCollectionView that can be used as a - // side input. - PCollectionView<Iterable<KV<String, Long>>> resultsView = results - .apply("ResultsView", View.<KV<String, Long>>asIterable()); - PCollectionTuple partitions = singleton.apply(ParDo - .of(new WritePartition( - resultsView, - multiPartitionsTag, - singlePartitionTag)) - .withSideInputs(resultsView) - .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - - // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then - // the import needs to be split into multiple partitions, and those partitions will be - // specified in multiPartitionsTag. - PCollection<String> tempTables = partitions.get(multiPartitionsTag) - .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( - false, - getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - getJsonSchema(), - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - getTableDescription())) - .withSideInputs(jobIdTokenView)); - - PCollectionView<Iterable<String>> tempTablesView = tempTables - .apply("TempTablesView", View.<String>asIterable()); - singleton.apply(ParDo - .of(new WriteRename( - getBigQueryServices(), - jobIdTokenView, - NestedValueProvider.of(table, new TableRefToJson()), - getWriteDisposition(), - getCreateDisposition(), - tempTablesView, - getTableDescription())) - .withSideInputs(tempTablesView, jobIdTokenView)); - - // Write single partition to final table - partitions.get(singlePartitionTag) - .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) - .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( - true, - getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - NestedValueProvider.of(table, new TableRefToJson()), - getJsonSchema(), - getWriteDisposition(), - getCreateDisposition(), - getTableDescription())) - .withSideInputs(jobIdTokenView)); - - return PDone.in(input.getPipeline()); - } - - private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> { - private transient TableRowWriter writer = null; - private final String tempFilePrefix; - - WriteBundles(String tempFilePrefix) { - this.tempFilePrefix = tempFilePrefix; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - if (writer == null) { - writer = new TableRowWriter(tempFilePrefix); - writer.open(UUID.randomUUID().toString()); - LOG.debug("Done opening writer {}", writer); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (writer != null) { - c.output(writer.close()); - writer = null; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")); + return input.apply(new BatchLoadBigQuery<T>(this)); } } @@ -1289,7 +1053,7 @@ public class BigQueryIO { * * <p>If the table's project is not specified, use the executing project. */ - @Nullable private ValueProvider<TableReference> getTableWithDefaultProject( + @Nullable ValueProvider<TableReference> getTableWithDefaultProject( BigQueryOptions bqOptions) { ValueProvider<TableReference> table = getTable(); if (table == null) { @@ -1319,391 +1083,6 @@ public class BigQueryIO { } - static class TableRowWriter { - private static final Coder<TableRow> CODER = TableRowJsonCoder.of(); - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final String tempFilePrefix; - private String id; - private String fileName; - private WritableByteChannel channel; - protected String mimeType = MimeTypes.TEXT; - private CountingOutputStream out; - - TableRowWriter(String basename) { - this.tempFilePrefix = basename; - } - - public final void open(String uId) throws Exception { - id = uId; - fileName = tempFilePrefix + id; - LOG.debug("Opening {}.", fileName); - channel = IOChannelUtils.create(fileName, mimeType); - try { - out = new CountingOutputStream(Channels.newOutputStream(channel)); - LOG.debug("Writing header to {}.", fileName); - } catch (Exception e) { - try { - LOG.error("Writing header to {} failed, closing channel.", fileName); - channel.close(); - } catch (IOException closeException) { - LOG.error("Closing channel for {} failed", fileName); - } - throw e; - } - LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); - } - - public void write(TableRow value) throws Exception { - CODER.encode(value, out, Context.OUTER); - out.write(NEWLINE); - } - - public final KV<String, Long> close() throws IOException { - channel.close(); - return KV.of(fileName, out.getCount()); - } - } - - /** - * Partitions temporary files based on number of files and file sizes. - */ - static class WritePartition extends DoFn<String, KV<Long, List<String>>> { - private final PCollectionView<Iterable<KV<String, Long>>> resultsView; - private TupleTag<KV<Long, List<String>>> multiPartitionsTag; - private TupleTag<KV<Long, List<String>>> singlePartitionTag; - - public WritePartition( - PCollectionView<Iterable<KV<String, Long>>> resultsView, - TupleTag<KV<Long, List<String>>> multiPartitionsTag, - TupleTag<KV<Long, List<String>>> singlePartitionTag) { - this.resultsView = resultsView; - this.multiPartitionsTag = multiPartitionsTag; - this.singlePartitionTag = singlePartitionTag; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView)); - if (results.isEmpty()) { - TableRowWriter writer = new TableRowWriter(c.element()); - writer.open(UUID.randomUUID().toString()); - results.add(writer.close()); - } - - long partitionId = 0; - int currNumFiles = 0; - long currSizeBytes = 0; - List<String> currResults = Lists.newArrayList(); - for (int i = 0; i < results.size(); ++i) { - KV<String, Long> fileResult = results.get(i); - if (currNumFiles + 1 > Write.MAX_NUM_FILES - || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) { - c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); - currResults = Lists.newArrayList(); - currNumFiles = 0; - currSizeBytes = 0; - } - ++currNumFiles; - currSizeBytes += fileResult.getValue(); - currResults.add(fileResult.getKey()); - } - if (partitionId == 0) { - c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults)); - } else { - c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); - } - } - } - - /** - * Writes partitions to BigQuery tables. - */ - static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { - private final boolean singlePartition; - private final BigQueryServices bqServices; - private final PCollectionView<String> jobIdToken; - private final String tempFilePrefix; - private final ValueProvider<String> jsonTableRef; - private final ValueProvider<String> jsonSchema; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - @Nullable private final String tableDescription; - - public WriteTables( - boolean singlePartition, - BigQueryServices bqServices, - PCollectionView<String> jobIdToken, - String tempFilePrefix, - ValueProvider<String> jsonTableRef, - ValueProvider<String> jsonSchema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - @Nullable String tableDescription) { - this.singlePartition = singlePartition; - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.tempFilePrefix = tempFilePrefix; - this.jsonTableRef = jsonTableRef; - this.jsonSchema = jsonSchema; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.tableDescription = tableDescription; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); - String jobIdPrefix = String.format( - c.sideInput(jobIdToken) + "_%05d", c.element().getKey()); - TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(), - TableReference.class); - if (!singlePartition) { - ref.setTableId(jobIdPrefix); - } - - load( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - ref, - BigQueryHelpers.fromJsonString( - jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), - partition, - writeDisposition, - createDisposition, - tableDescription); - c.output(BigQueryHelpers.toJsonString(ref)); - - removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); - } - - private void load( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - @Nullable TableSchema schema, - List<String> gcsUris, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - @Nullable String tableDescription) throws InterruptedException, IOException { - JobConfigurationLoad loadConfig = new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(gcsUris) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); - - String projectId = ref.getProjectId(); - Job lastFailedLoadJob = null; - for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { - String jobId = jobIdPrefix + "-" + i; - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); - jobService.startLoadJob(jobRef, loadConfig); - Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); - Status jobStatus = BigQueryHelpers.parseStatus(loadJob); - switch (jobStatus) { - case SUCCEEDED: - if (tableDescription != null) { - datasetService.patchTableDescription(ref, tableDescription); - } - return; - case UNKNOWN: - throw new RuntimeException(String.format( - "UNKNOWN status of load job [%s]: %s.", jobId, - BigQueryHelpers.jobToPrettyString(loadJob))); - case FAILED: - lastFailedLoadJob = loadJob; - continue; - default: - throw new IllegalStateException(String.format( - "Unexpected status [%s] of load job: %s.", - jobStatus, BigQueryHelpers.jobToPrettyString(loadJob))); - } - } - throw new RuntimeException(String.format( - "Failed to create load job with id prefix %s, " - + "reached max retries: %d, last failed load job: %s.", - jobIdPrefix, - Write.MAX_RETRY_JOBS, - BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); - } - - static void removeTemporaryFiles( - PipelineOptions options, - String tempFilePrefix, - Collection<String> files) - throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix); - if (factory instanceof GcsIOChannelFactory) { - GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); - gcsUtil.remove(files); - } else if (factory instanceof FileIOChannelFactory) { - for (String filename : files) { - LOG.debug("Removing file {}", filename); - boolean exists = Files.deleteIfExists(Paths.get(filename)); - if (!exists) { - LOG.debug("{} does not exist.", filename); - } - } - } else { - throw new IOException("Unrecognized file system."); - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")) - .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) - .withLabel("Table Reference")) - .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) - .withLabel("Table Description")); - } - } - - /** - * Copies temporary tables to destination table. - */ - static class WriteRename extends DoFn<String, Void> { - private final BigQueryServices bqServices; - private final PCollectionView<String> jobIdToken; - private final ValueProvider<String> jsonTableRef; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - private final PCollectionView<Iterable<String>> tempTablesView; - @Nullable private final String tableDescription; - - public WriteRename( - BigQueryServices bqServices, - PCollectionView<String> jobIdToken, - ValueProvider<String> jsonTableRef, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - PCollectionView<Iterable<String>> tempTablesView, - @Nullable String tableDescription) { - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.jsonTableRef = jsonTableRef; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.tempTablesView = tempTablesView; - this.tableDescription = tableDescription; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); - - // Do not copy if no temp tables are provided - if (tempTablesJson.size() == 0) { - return; - } - - List<TableReference> tempTables = Lists.newArrayList(); - for (String table : tempTablesJson) { - tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); - } - copy( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - c.sideInput(jobIdToken), - BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class), - tempTables, - writeDisposition, - createDisposition, - tableDescription); - - DatasetService tableService = - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); - removeTemporaryTables(tableService, tempTables); - } - - private void copy( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - List<TableReference> tempTables, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - @Nullable String tableDescription) throws InterruptedException, IOException { - JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() - .setSourceTables(tempTables) - .setDestinationTable(ref) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()); - - String projectId = ref.getProjectId(); - Job lastFailedCopyJob = null; - for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { - String jobId = jobIdPrefix + "-" + i; - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); - jobService.startCopyJob(jobRef, copyConfig); - Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); - Status jobStatus = BigQueryHelpers.parseStatus(copyJob); - switch (jobStatus) { - case SUCCEEDED: - if (tableDescription != null) { - datasetService.patchTableDescription(ref, tableDescription); - } - return; - case UNKNOWN: - throw new RuntimeException(String.format( - "UNKNOWN status of copy job [%s]: %s.", jobId, - BigQueryHelpers.jobToPrettyString(copyJob))); - case FAILED: - lastFailedCopyJob = copyJob; - continue; - default: - throw new IllegalStateException(String.format( - "Unexpected status [%s] of load job: %s.", - jobStatus, BigQueryHelpers.jobToPrettyString(copyJob))); - } - } - throw new RuntimeException(String.format( - "Failed to create copy job with id prefix %s, " - + "reached max retries: %d, last failed copy job: %s.", - jobIdPrefix, - Write.MAX_RETRY_JOBS, - BigQueryHelpers.jobToPrettyString(lastFailedCopyJob))); - } - - static void removeTemporaryTables(DatasetService tableService, - List<TableReference> tempTables) { - for (TableReference tableRef : tempTables) { - try { - LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef)); - tableService.deleteTable(tableRef); - } catch (Exception e) { - LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e); - } - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) - .withLabel("Table Reference")) - .add(DisplayData.item("writeDisposition", writeDisposition.toString()) - .withLabel("Write Disposition")) - .add(DisplayData.item("createDisposition", createDisposition.toString()) - .withLabel("Create Disposition")); - } - } } private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { @@ -1753,434 +1132,6 @@ public class BigQueryIO { static void clearCreatedTables() { StreamingWriteFn.clearCreatedTables(); } - ///////////////////////////////////////////////////////////////////////////// - - /** - * Implementation of DoFn to perform streaming BigQuery write. - */ - @VisibleForTesting - static class StreamingWriteFn - extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { - /** TableSchema in JSON. Use String to make the class Serializable. */ - @Nullable private final ValueProvider<String> jsonTableSchema; - - @Nullable private final String tableDescription; - - private final BigQueryServices bqServices; - - /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ - private transient Map<String, List<TableRow>> tableRows; - - private final Write.CreateDisposition createDisposition; - - /** The list of unique ids for each BigQuery table row. */ - private transient Map<String, List<String>> uniqueIdsForTableRows; - - /** The list of tables created so far, so we don't try the creation - each time. */ - private static Set<String> createdTables = - Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - - /** Tracks bytes written, exposed as "ByteCount" Metrics Counter. */ - private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount"); - - /** Constructor. */ - StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema, - Write.CreateDisposition createDisposition, - @Nullable String tableDescription, BigQueryServices bqServices) { - this.jsonTableSchema = schema == null ? null : - NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); - this.createDisposition = createDisposition; - this.bqServices = checkNotNull(bqServices, "bqServices"); - this.tableDescription = tableDescription; - } - - /** - * Clear the cached map of created tables. Used for testing. - */ - private static void clearCreatedTables() { - synchronized (createdTables) { - createdTables.clear(); - } - } - - /** Prepares a target BigQuery table. */ - @StartBundle - public void startBundle(Context context) { - tableRows = new HashMap<>(); - uniqueIdsForTableRows = new HashMap<>(); - } - - /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ - @ProcessElement - public void processElement(ProcessContext context) { - String tableSpec = context.element().getKey().getKey(); - List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); - List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, - tableSpec); - - rows.add(context.element().getValue().tableRow); - uniqueIds.add(context.element().getValue().uniqueId); - } - - /** Writes the accumulated rows into BigQuery with streaming API. */ - @FinishBundle - public void finishBundle(Context context) throws Exception { - BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - - for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) { - TableReference tableReference = getOrCreateTable(options, entry.getKey()); - flushRows(tableReference, entry.getValue(), - uniqueIdsForTableRows.get(entry.getKey()), options); - } - tableRows.clear(); - uniqueIdsForTableRows.clear(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("schema", jsonTableSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) - .withLabel("Table Description")); - } - - public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) - throws InterruptedException, IOException { - TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec); - if (createDisposition != createDisposition.CREATE_NEVER - && !createdTables.contains(tableSpec)) { - synchronized (createdTables) { - // Another thread may have succeeded in creating the table in the meanwhile, so - // check again. This check isn't needed for correctness, but we add it to prevent - // every thread from attempting a create and overwhelming our BigQuery quota. - DatasetService datasetService = bqServices.getDatasetService(options); - if (!createdTables.contains(tableSpec)) { - if (datasetService.getTable(tableReference) == null) { - TableSchema tableSchema = JSON_FACTORY.fromString( - jsonTableSchema.get(), TableSchema.class); - datasetService.createTable( - new Table() - .setTableReference(tableReference) - .setSchema(tableSchema) - .setDescription(tableDescription)); - } - createdTables.add(tableSpec); - } - } - } - return tableReference; - } - - /** - * Writes the accumulated rows into BigQuery with streaming API. - */ - private void flushRows(TableReference tableReference, - List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) - throws InterruptedException { - if (!tableRows.isEmpty()) { - try { - long totalBytes = bqServices.getDatasetService(options).insertAll( - tableReference, tableRows, uniqueIds); - byteCounter.inc(totalBytes); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - } - - private static class ShardedKey<K> { - private final K key; - private final int shardNumber; - - public static <K> ShardedKey<K> of(K key, int shardNumber) { - return new ShardedKey<>(key, shardNumber); - } - - private ShardedKey(K key, int shardNumber) { - this.key = key; - this.shardNumber = shardNumber; - } - - public K getKey() { - return key; - } - - public int getShardNumber() { - return shardNumber; - } - } - - /** - * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. - */ - @VisibleForTesting - static class ShardedKeyCoder<KeyT> - extends StandardCoder<ShardedKey<KeyT>> { - public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { - return new ShardedKeyCoder<>(keyCoder); - } - - @JsonCreator - public static <KeyT> ShardedKeyCoder<KeyT> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<KeyT>> components) { - checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); - return of(components.get(0)); - } - - protected ShardedKeyCoder(Coder<KeyT> keyCoder) { - this.keyCoder = keyCoder; - this.shardNumberCoder = VarIntCoder.of(); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(keyCoder); - } - - @Override - public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context) - throws IOException { - keyCoder.encode(key.getKey(), outStream, context.nested()); - shardNumberCoder.encode(key.getShardNumber(), outStream, context); - } - - @Override - public ShardedKey<KeyT> decode(InputStream inStream, Context context) - throws IOException { - return new ShardedKey<>( - keyCoder.decode(inStream, context.nested()), - shardNumberCoder.decode(inStream, context)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - keyCoder.verifyDeterministic(); - } - - Coder<KeyT> keyCoder; - VarIntCoder shardNumberCoder; - } - - @VisibleForTesting - static class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { - private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder(); - - @JsonCreator - public static TableRowInfoCoder of() { - return INSTANCE; - } - - @Override - public void encode(TableRowInfo value, OutputStream outStream, Context context) - throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null value"); - } - tableRowCoder.encode(value.tableRow, outStream, context.nested()); - idCoder.encode(value.uniqueId, outStream, context); - } - - @Override - public TableRowInfo decode(InputStream inStream, Context context) - throws IOException { - return new TableRowInfo( - tableRowCoder.decode(inStream, context.nested()), - idCoder.decode(inStream, context)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, "TableRows are not deterministic."); - } - - TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of(); - StringUtf8Coder idCoder = StringUtf8Coder.of(); - } - - private static class TableRowInfo { - TableRowInfo(TableRow tableRow, String uniqueId) { - this.tableRow = tableRow; - this.uniqueId = uniqueId; - } - - final TableRow tableRow; - final String uniqueId; - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Fn that tags each table row with a unique id and destination table. - * To avoid calling UUID.randomUUID() for each element, which can be costly, - * a randomUUID is generated only once per bucket of data. The actual unique - * id is created by concatenating this randomUUID with a sequential number. - */ - @VisibleForTesting - static class TagWithUniqueIdsAndTable<T> - extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> { - /** TableSpec to write to. */ - private final ValueProvider<String> tableSpec; - - /** User function mapping windowed values to {@link TableReference} in JSON. */ - private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction; - - /** User function mapping user type to a TableRow. */ - private final SerializableFunction<T, TableRow> formatFunction; - - private transient String randomUUID; - private transient long sequenceNo = 0L; - - TagWithUniqueIdsAndTable(BigQueryOptions options, - ValueProvider<TableReference> table, - SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction, - SerializableFunction<T, TableRow> formatFunction) { - checkArgument(table == null ^ tableRefFunction == null, - "Exactly one of table or tableRefFunction should be set"); - if (table != null) { - if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { - TableReference tableRef = table.get() - .setProjectId(options.as(BigQueryOptions.class).getProject()); - table = NestedValueProvider.of( - StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)), - new JsonTableRefToTableRef()); - } - this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); - } else { - tableSpec = null; - } - this.tableRefFunction = tableRefFunction; - this.formatFunction = formatFunction; - } - - - @StartBundle - public void startBundle(Context context) { - randomUUID = UUID.randomUUID().toString(); - } - - /** Tag the input with a unique id. */ - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) throws IOException { - String uniqueId = randomUUID + sequenceNo++; - ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); - String tableSpec = tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), - ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); - // We output on keys 0-50 to ensure that there's enough batching for - // BigQuery. - context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), - new TableRowInfo(formatFunction.apply(context.element()), uniqueId))); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull(DisplayData.item("table", tableSpec)); - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) - .withLabel("Table Reference Function")); - } - } - - @VisibleForTesting - ValueProvider<String> getTableSpec() { - return tableSpec; - } - - private String tableSpecFromWindowedValue(BigQueryOptions options, - ValueInSingleWindow<T> value) { - if (tableSpec != null) { - return tableSpec.get(); - } else { - TableReference table = tableRefFunction.apply(value); - if (table.getProjectId() == null) { - table.setProjectId(options.getProject()); - } - return BigQueryHelpers.toTableSpec(table); - } - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * PTransform that performs streaming BigQuery write. To increase consistency, - * it leverages BigQuery best effort de-dup mechanism. - */ - private static class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> { - private final Write<T> write; - - /** Constructor. */ - StreamWithDeDup(Write<T> write) { - this.write = write; - } - - @Override - protected Coder<Void> getDefaultOutputCoder() { - return VoidCoder.of(); - } - - @Override - public PDone expand(PCollection<T> input) { - // A naive implementation would be to simply stream data directly to BigQuery. - // However, this could occasionally lead to duplicated data, e.g., when - // a VM that runs this code is restarted and the code is re-run. - - // The above risk is mitigated in this implementation by relying on - // BigQuery built-in best effort de-dup mechanism. - - // To use this mechanism, each input TableRow is tagged with a generated - // unique id, which is then passed to BigQuery and used to ignore duplicates. - - PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = - input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>( - input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(), - write.getTableRefFunction(), write.getFormatFunction()))); - - // To prevent having the same TableRow processed more than once with regenerated - // different unique ids, this implementation relies on "checkpointing", which is - // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, - // performed by Reshuffle. - NestedValueProvider<TableSchema, String> schema = - write.getJsonSchema() == null - ? null - : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema()); - tagged - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) - .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply( - ParDo.of( - new StreamingWriteFn( - schema, - write.getCreateDisposition(), - write.getTableDescription(), - write.getBigQueryServices()))); - - // Note that the implementation to return PDone here breaks the - // implicit assumption about the job execution order. If a user - // implements a PTransform that takes PDone returned here as its - // input, the transform may not necessarily be executed after - // the BigQueryIO.Write. - - return PDone.in(input.getPipeline()); - } - } - - /** - * Status of a BigQuery job or request. - */ - enum Status { - SUCCEEDED, - FAILED, - UNKNOWN, - } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index a909957..9153157 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; @@ -14,9 +32,10 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.List; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index ff50e6d..746258f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; @@ -18,7 +36,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index aae0faa..cbd5781 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java index 612afbe..75f7b93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.io.gcp.bigquery; import com.google.common.annotations.VisibleForTesting; http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java new file mode 100644 index 0000000..8c968df --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +/** + * A key and a shard number. + */ +class ShardedKey<K> { + private final K key; + private final int shardNumber; + + public static <K> ShardedKey<K> of(K key, int shardNumber) { + return new ShardedKey<>(key, shardNumber); + } + + ShardedKey(K key, int shardNumber) { + this.key = key; + this.shardNumber = shardNumber; + } + + public K getKey() { + return key; + } + + public int getShardNumber() { + return shardNumber; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java new file mode 100644 index 0000000..be4e71c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.util.PropertyNames; + + +/** + * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. + */ +@VisibleForTesting +class ShardedKeyCoder<KeyT> + extends StandardCoder<ShardedKey<KeyT>> { + public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { + return new ShardedKeyCoder<>(keyCoder); + } + + @JsonCreator + public static <KeyT> ShardedKeyCoder<KeyT> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<KeyT>> components) { + checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); + return of(components.get(0)); + } + + protected ShardedKeyCoder(Coder<KeyT> keyCoder) { + this.keyCoder = keyCoder; + this.shardNumberCoder = VarIntCoder.of(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(keyCoder); + } + + @Override + public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context) + throws IOException { + keyCoder.encode(key.getKey(), outStream, context.nested()); + shardNumberCoder.encode(key.getShardNumber(), outStream, context); + } + + @Override + public ShardedKey<KeyT> decode(InputStream inStream, Context context) + throws IOException { + return new ShardedKey<>( + keyCoder.decode(inStream, context.nested()), + shardNumberCoder.decode(inStream, context)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + keyCoder.verifyDeterministic(); + } + + Coder<KeyT> keyCoder; + VarIntCoder shardNumberCoder; +} http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java new file mode 100644 index 0000000..f667295 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** +* PTransform that performs streaming BigQuery write. To increase consistency, +* it leverages BigQuery best effort de-dup mechanism. + */ +class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> { + private final Write<T> write; + + /** Constructor. */ + StreamWithDeDup(Write<T> write) { + this.write = write; + } + + @Override + protected Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } + + @Override + public PDone expand(PCollection<T> input) { + // A naive implementation would be to simply stream data directly to BigQuery. + // However, this could occasionally lead to duplicated data, e.g., when + // a VM that runs this code is restarted and the code is re-run. + + // The above risk is mitigated in this implementation by relying on + // BigQuery built-in best effort de-dup mechanism. + + // To use this mechanism, each input TableRow is tagged with a generated + // unique id, which is then passed to BigQuery and used to ignore duplicates. + + PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = + input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>( + input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(), + write.getTableRefFunction(), write.getFormatFunction()))); + + // To prevent having the same TableRow processed more than once with regenerated + // different unique ids, this implementation relies on "checkpointing", which is + // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, + // performed by Reshuffle. + NestedValueProvider<TableSchema, String> schema = + write.getJsonSchema() == null + ? null + : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema()); + tagged + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) + .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) + .apply( + ParDo.of( + new StreamingWriteFn( + schema, + write.getCreateDisposition(), + write.getTableDescription(), + write.getBigQueryServices()))); + + // Note that the implementation to return PDone here breaks the + // implicit assumption about the job execution order. If a user + // implements a PTransform that takes PDone returned here as its + // input, the transform may not necessarily be executed after + // the BigQueryIO.Write. + + return PDone.in(input.getPipeline()); + } +}
