Repository: beam Updated Branches: refs/heads/release-2.0.0 3c6316abf -> e282601b7
[BEAM-2154] Make BigQuery's dynamic-destination support scale to large numbers of destinations This cherry-picks #2883 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c7e35ae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c7e35ae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c7e35ae Branch: refs/heads/release-2.0.0 Commit: 1c7e35ae21ab09bbf5d458fd29a581eefd804646 Parents: 3c6316a Author: Eugene Kirpichov <[email protected]> Authored: Mon May 8 18:02:14 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue May 9 09:39:34 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 122 ++++++++++++++++--- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 35 +++--- .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 3 +- .../sdk/io/gcp/bigquery/TableRowWriter.java | 59 +++++---- .../io/gcp/bigquery/WriteBundlesToFiles.java | 111 ++++++++++++++--- .../bigquery/WriteGroupedRecordsToFiles.java | 68 +++++++++++ .../sdk/io/gcp/bigquery/WritePartition.java | 38 +++--- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 9 +- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 2 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 7 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 81 ++++++++---- .../sdk/io/gcp/bigquery/FakeJobService.java | 6 +- 12 files changed, 396 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index ba64ab1..0abd469 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import java.util.List; @@ -32,11 +33,15 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -49,6 +54,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -57,6 +63,36 @@ import org.apache.beam.sdk.values.TupleTagList; /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ class BatchLoads<DestinationT> extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> { + // The maximum number of file writers to keep open in a single bundle at a time, since file + // writers default to 64mb buffers. This comes into play when writing dynamic table destinations. + // The first 20 tables from a single BatchLoads transform will write files inline in the + // transform. Anything beyond that might be shuffled. Users using this transform directly who + // know that they are running on workers with sufficient memory can increase this by calling + // BatchLoads#setMaxNumWritersPerBundle. This allows the workers to do more work in memory, and + // save on the cost of shuffling some of this data. + // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on + // their own policy. + @VisibleForTesting + static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20; + + @VisibleForTesting + // Maximum number of files in a single partition. + static final int MAX_NUM_FILES = 10000; + + @VisibleForTesting + // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit. + static final long MAX_SIZE_BYTES = 11 * (1L << 40); + + // The maximum size of a single file - 4TiB, just under the 5 TiB limit. + static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40); + + // The maximum number of retries to poll the status of a job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. + static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + // The maximum number of retry jobs. + static final int MAX_RETRY_JOBS = 3; + private BigQueryServices bigQueryServices; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; @@ -65,6 +101,8 @@ class BatchLoads<DestinationT> private final boolean singletonTable; private final DynamicDestinations<?, DestinationT> dynamicDestinations; private final Coder<DestinationT> destinationCoder; + private int maxNumWritersPerBundle; + private long maxFileSize; BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition, boolean singletonTable, @@ -76,12 +114,29 @@ class BatchLoads<DestinationT> this.singletonTable = singletonTable; this.dynamicDestinations = dynamicDestinations; this.destinationCoder = destinationCoder; + this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE; + this.maxFileSize = DEFAULT_MAX_FILE_SIZE; } void setTestServices(BigQueryServices bigQueryServices) { this.bigQueryServices = bigQueryServices; } + /** Get the maximum number of file writers that will be open simultaneously in a bundle. */ + public int getMaxNumWritersPerBundle() { + return maxNumWritersPerBundle; + } + + /** Set the maximum number of file writers that will be open simultaneously in a bundle. */ + public void setMaxNumWritersPerBundle(int maxNumWritersPerBundle) { + this.maxNumWritersPerBundle = maxNumWritersPerBundle; + } + + @VisibleForTesting + void setMaxFileSize(long maxFileSize) { + this.maxFileSize = maxFileSize; + } + @Override public void validate(PipelineOptions options) { // We will use a BigQuery load job -- validate the temp location. @@ -107,21 +162,25 @@ class BatchLoads<DestinationT> Pipeline p = input.getPipeline(); final String stepUuid = BigQueryHelpers.randomUUIDString(); + PCollectionView<String> tempFilePrefix = + p.apply("Create", Create.of((Void) null)) + .apply( + "GetTempFilePrefix", + ParDo.of( + new DoFn<Void, String>() { + @ProcessElement + public void getTempFilePrefix(ProcessContext c) { + c.output( + resolveTempLocation( + c.getPipelineOptions().getTempLocation(), + "BigQueryWriteTemp", + stepUuid)); + } + })) + .apply("TempFilePrefixView", View.<String>asSingleton()); + // Create a singleton job ID token at execution time. This will be used as the base for all // load jobs issued from this instance of the transform. - PCollection<String> singleton = p - .apply("Create", Create.of((Void) null)) - .apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() { - @ProcessElement - public void getTempFilePrefix(ProcessContext c) { - c.output( - resolveTempLocation( - c.getPipelineOptions().getTempLocation(), - "BigQueryWriteTemp", - stepUuid)); - } - })); - PCollectionView<String> jobIdTokenView = p.apply("TriggerIdCreation", Create.of("ignored")) .apply( @@ -144,12 +203,37 @@ class BatchLoads<DestinationT> PCollectionView<Map<DestinationT, String>> schemasView = inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations)); + TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag = + new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles"){}; + TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag = + new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {}; + PCollectionTuple writeBundlesTuple = inputInGlobalWindow + .apply("WriteBundlesToFiles", + ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag, + maxNumWritersPerBundle, maxFileSize)) + .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); + PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles = + writeBundlesTuple.get(writtenFilesTag) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + + // If the bundles contain too many output tables to be written inline to files (due to memory + // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection. + // Group these records by key, and write the files after grouping. Since the record is grouped + // by key, we can ensure that only one file is open at a time in each bundle. + PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped = + writeBundlesTuple + .get(unwrittedRecordsTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of())) + .apply(GroupByKey.<ShardedKey<DestinationT>, TableRow>create()) + .apply( + ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize)) + .withSideInputs(tempFilePrefix)) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + // PCollection of filename, file byte size, and table destination. PCollection<WriteBundlesToFiles.Result<DestinationT>> results = - inputInGlobalWindow - .apply("WriteBundlesToFiles", ParDo.of( - new WriteBundlesToFiles<DestinationT>(stepUuid))) - .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + PCollectionList.of(writtenFiles).and(writtenFilesGrouped) + .apply(Flatten.<Result<DestinationT>>pCollections()); TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {}; @@ -164,16 +248,18 @@ class BatchLoads<DestinationT> // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. + PCollection<Void> singleton = p.apply(Create.of((Void) null).withCoder(VoidCoder.of())); PCollectionTuple partitions = singleton.apply( "WritePartition", ParDo.of( new WritePartition<>( singletonTable, + tempFilePrefix, resultsView, multiPartitionsTag, singlePartitionTag)) - .withSideInputs(resultsView) + .withSideInputs(tempFilePrefix, resultsView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); List<PCollectionView<?>> writeTablesSideInputs = http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 8fb05ff..cf258ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -658,21 +658,6 @@ public class BigQueryIO { /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> { - @VisibleForTesting - // Maximum number of files in a single partition. - static final int MAX_NUM_FILES = 10000; - - @VisibleForTesting - // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit. - static final long MAX_SIZE_BYTES = 11 * (1L << 40); - - // The maximum number of retry jobs. - static final int MAX_RETRY_JOBS = 3; - - // The maximum number of retries to poll the status of a job. - // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. - static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - @Nullable abstract ValueProvider<String> getJsonTableRef(); @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination> getTableFunction(); @@ -687,6 +672,8 @@ public class BigQueryIO { /** An option to indicate if table validation is desired. Default is true. */ abstract boolean getValidate(); abstract BigQueryServices getBigQueryServices(); + @Nullable abstract Integer getMaxFilesPerBundle(); + @Nullable abstract Long getMaxFileSize(); abstract Builder<T> toBuilder(); @@ -704,6 +691,8 @@ public class BigQueryIO { abstract Builder<T> setTableDescription(String tableDescription); abstract Builder<T> setValidate(boolean validate); abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices); + abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle); + abstract Builder<T> setMaxFileSize(Long maxFileSize); abstract Write<T> build(); } @@ -882,6 +871,16 @@ public class BigQueryIO { return toBuilder().setBigQueryServices(testServices).build(); } + @VisibleForTesting + Write<T> withMaxFilesPerBundle(int maxFilesPerBundle) { + return toBuilder().setMaxFilesPerBundle(maxFilesPerBundle).build(); + } + + @VisibleForTesting + Write<T> withMaxFileSize(long maxFileSize) { + return toBuilder().setMaxFileSize(maxFileSize).build(); + } + @Override public void validate(PipelineOptions pipelineOptions) { BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class); @@ -993,6 +992,12 @@ public class BigQueryIO { dynamicDestinations, destinationCoder); batchLoads.setTestServices(getBigQueryServices()); + if (getMaxFilesPerBundle() != null) { + batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle()); + } + if (getMaxFileSize() != null) { + batchLoads.setMaxFileSize(getMaxFileSize()); + } return rowsWithDestination.apply(batchLoads); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java index f04e9b9..7aefcfa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java @@ -63,8 +63,7 @@ class ShardedKeyCoder<KeyT> public ShardedKey<KeyT> decode(InputStream inStream, Context context) throws IOException { return new ShardedKey<>( - keyCoder.decode(inStream, context.nested()), - shardNumberCoder.decode(inStream, context)); + keyCoder.decode(inStream, context.nested()), shardNumberCoder.decode(inStream, context)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index f9d8785..e2db871 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -18,12 +18,15 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Preconditions.checkState; + import com.google.api.services.bigquery.model.TableRow; import com.google.common.io.CountingOutputStream; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.UUID; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.io.FileSystems; @@ -32,20 +35,19 @@ import org.apache.beam.sdk.util.MimeTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */ -class TableRowWriter { - private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); +/** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */ +class TableRowWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(TableRowWriter.class); private static final Coder<TableRow> CODER = TableRowJsonCoder.of(); private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final String tempFilePrefix; - private String id; private ResourceId resourceId; private WritableByteChannel channel; - protected String mimeType = MimeTypes.TEXT; private CountingOutputStream out; - public static final class Result { + private boolean isClosed = false; + + static final class Result { final ResourceId resourceId; final long byteSize; @@ -55,37 +57,32 @@ class TableRowWriter { } } - TableRowWriter(String basename) { - this.tempFilePrefix = basename; - } - - public final void open(String uId) throws Exception { - id = uId; - resourceId = FileSystems.matchNewResource(tempFilePrefix + id, false); - LOG.debug("Opening {}.", resourceId); - channel = FileSystems.create(resourceId, mimeType); - try { - out = new CountingOutputStream(Channels.newOutputStream(channel)); - LOG.debug("Writing header to {}.", resourceId); - } catch (Exception e) { - try { - LOG.error("Writing header to {} failed, closing channel.", resourceId); - channel.close(); - } catch (IOException closeException) { - LOG.error("Closing channel for {} failed", resourceId); - } - throw e; - } - LOG.debug("Starting write of bundle {} to {}.", this.id, resourceId); + TableRowWriter(String basename) throws Exception { + String uId = UUID.randomUUID().toString(); + resourceId = FileSystems.matchNewResource(basename + uId, false); + LOG.info("Opening TableRowWriter to {}.", resourceId); + channel = FileSystems.create(resourceId, MimeTypes.TEXT); + out = new CountingOutputStream(Channels.newOutputStream(channel)); } - public void write(TableRow value) throws Exception { + void write(TableRow value) throws Exception { CODER.encode(value, out, Context.OUTER); out.write(NEWLINE); } - public final Result close() throws IOException { + long getByteSize() { + return out.getCount(); + } + + @Override + public void close() throws IOException { + checkState(!isClosed, "Already closed"); + isClosed = true; channel.close(); + } + + Result getResult() { + checkState(isClosed, "Not yet closed"); return new Result(resourceId, out.getCount()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index b896083..890979b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; import java.io.InputStream; @@ -29,31 +30,43 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Writes each bundle of {@link TableRow} elements out to a separate file using {@link - * TableRowWriter}. + * Writes each bundle of {@link TableRow} elements out to separate file using {@link + * TableRowWriter}. Elements destined to different destinations are written to separate files. + * The transform will not write an element to a file if it is already writing to + * {@link #maxNumWritersPerBundle} files and the element is destined to a new destination. In this + * case, the element will be spilled into the output, and the {@link WriteGroupedRecordsToFiles} + * transform will take care of writing it to a file. */ class WriteBundlesToFiles<DestinationT> - extends DoFn<KV<DestinationT, TableRow>, WriteBundlesToFiles.Result<DestinationT>> { + extends DoFn<KV<DestinationT, TableRow>, Result<DestinationT>> { private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class); + // When we spill records, shard the output keys to prevent hotspots. Experiments running up to + // 10TB of data have shown a sharding of 10 to be a good choice. + private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; + // Map from tablespec to a writer for that table. private transient Map<DestinationT, TableRowWriter> writers; private transient Map<DestinationT, BoundedWindow> writerWindows; private final String stepUuid; - + private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag; + private int maxNumWritersPerBundle; + private long maxFileSize; /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, @@ -115,30 +128,66 @@ class WriteBundlesToFiles<DestinationT> public void verifyDeterministic() {} } - WriteBundlesToFiles(String stepUuid) { + WriteBundlesToFiles( + String stepUuid, + TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag, + int maxNumWritersPerBundle, + long maxFileSize) { this.stepUuid = stepUuid; + this.unwrittedRecordsTag = unwrittedRecordsTag; + this.maxNumWritersPerBundle = maxNumWritersPerBundle; + this.maxFileSize = maxFileSize; } @StartBundle public void startBundle() { - // This must be done each bundle, as by default the {@link DoFn} might be reused between + // This must be done for each bundle, as by default the {@link DoFn} might be reused between // bundles. this.writers = Maps.newHashMap(); this.writerWindows = Maps.newHashMap(); } + TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix, + BoundedWindow window) throws Exception { + TableRowWriter writer = new TableRowWriter(tempFilePrefix); + writers.put(destination, writer); + writerWindows.put(destination, window); + return writer; + } + @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { String tempFilePrefix = resolveTempLocation( c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); - TableRowWriter writer = writers.get(c.element().getKey()); - if (writer == null) { - writer = new TableRowWriter(tempFilePrefix); - writer.open(UUID.randomUUID().toString()); - writers.put(c.element().getKey(), writer); - writerWindows.put(c.element().getKey(), window); - LOG.debug("Done opening writer {}", writer); + DestinationT destination = c.element().getKey(); + + TableRowWriter writer; + if (writers.containsKey(destination)) { + writer = writers.get(destination); + } else { + // Only create a new writer if we have fewer than maxNumWritersPerBundle already in this + // bundle. + if (writers.size() <= maxNumWritersPerBundle) { + writer = createAndInsertWriter(destination, tempFilePrefix, window); + } else { + // This means that we already had too many writers open in this bundle. "spill" this record + // into the output. It will be grouped and written to a file in a subsequent stage. + c.output(unwrittedRecordsTag, + KV.of(ShardedKey.of(destination, + ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR)), + c.element().getValue())); + return; + } } + + if (writer.getByteSize() > maxFileSize) { + // File is too big. Close it and open a new file. + writer.close(); + TableRowWriter.Result result = writer.getResult(); + c.output(new Result<>(result.resourceId.toString(), result.byteSize, destination)); + writer = createAndInsertWriter(destination, tempFilePrefix, window); + } + try { writer.write(c.element().getValue()); } catch (Exception e) { @@ -156,14 +205,36 @@ class WriteBundlesToFiles<DestinationT> @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { + List<Exception> exceptionList = Lists.newArrayList(); + for (TableRowWriter writer : writers.values()) { + try { + writer.close(); + } catch (Exception e) { + exceptionList.add(e); + } + } + if (!exceptionList.isEmpty()) { + Exception e = new IOException("Failed to close some writers"); + for (Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) { - TableRowWriter.Result result = entry.getValue().close(); - c.output( - new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey()), - writerWindows.get(entry.getKey()).maxTimestamp(), - writerWindows.get(entry.getKey())); + try { + DestinationT destination = entry.getKey(); + TableRowWriter writer = entry.getValue(); + TableRowWriter.Result result = writer.getResult(); + c.output( + new Result<>(result.resourceId.toString(), result.byteSize, destination), + writerWindows.get(destination).maxTimestamp(), + writerWindows.get(destination)); + } catch (Exception e) { + exceptionList.add(e); + } } writers.clear(); - writerWindows.clear(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java new file mode 100644 index 0000000..45dc2a8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Receives elements grouped by their (sharded) destination, and writes them out to a file. + * Since all the elements in the {@link Iterable} are destined to the same table, they are all + * written to the same file. Ensures that only one {@link TableRowWriter} is active per bundle. + */ +class WriteGroupedRecordsToFiles<DestinationT> + extends DoFn<KV<ShardedKey<DestinationT>, Iterable<TableRow>>, + WriteBundlesToFiles.Result<DestinationT>> { + private static final Logger LOG = LoggerFactory.getLogger(WriteGroupedRecordsToFiles.class); + + private final PCollectionView<String> tempFilePrefix; + private final long maxFileSize; + + WriteGroupedRecordsToFiles(PCollectionView<String> tempFilePrefix, long maxFileSize) { + this.tempFilePrefix = tempFilePrefix; + this.maxFileSize = maxFileSize; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String tempFilePrefix = c.sideInput(this.tempFilePrefix); + TableRowWriter writer = new TableRowWriter(tempFilePrefix); + try (TableRowWriter ignored = writer) { + for (TableRow tableRow : c.element().getValue()) { + if (writer.getByteSize() > maxFileSize) { + writer.close(); + TableRowWriter.Result result = writer.getResult(); + c.output(new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); + writer = new TableRowWriter(tempFilePrefix); + } + writer.write(tableRow); + } + } + TableRowWriter.Result result = writer.getResult(); + c.output( + new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 66004b2..24693da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -22,8 +22,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; -import java.util.UUID; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; @@ -35,9 +33,10 @@ import org.apache.beam.sdk.values.TupleTag; * tablespec and the list of files corresponding to each partition of that table. */ class WritePartition<DestinationT> - extends DoFn<String, KV<ShardedKey<DestinationT>, List<String>>> { + extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> { private final boolean singletonTable; - private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView; + private final PCollectionView<String> tempFilePrefix; + private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results; private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag; private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag; @@ -73,8 +72,8 @@ class WritePartition<DestinationT> // Check to see whether we can add to this partition without exceeding the maximum partition // size. boolean canAccept(int numFiles, long numBytes) { - return this.numFiles + numFiles <= Write.MAX_NUM_FILES - && this.byteSize + numBytes <= Write.MAX_SIZE_BYTES; + return this.numFiles + numFiles <= BatchLoads.MAX_NUM_FILES + && this.byteSize + numBytes <= BatchLoads.MAX_SIZE_BYTES; } } @@ -101,11 +100,13 @@ class WritePartition<DestinationT> WritePartition( boolean singletonTable, - PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView, + PCollectionView<String> tempFilePrefix, + PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) { this.singletonTable = singletonTable; - this.resultsView = resultsView; + this.results = results; + this.tempFilePrefix = tempFilePrefix; this.multiPartitionsTag = multiPartitionsTag; this.singlePartitionTag = singlePartitionTag; } @@ -113,21 +114,20 @@ class WritePartition<DestinationT> @ProcessElement public void processElement(ProcessContext c) throws Exception { List<WriteBundlesToFiles.Result<DestinationT>> results = - Lists.newArrayList(c.sideInput(resultsView)); + Lists.newArrayList(c.sideInput(this.results)); // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. if (results.isEmpty() && singletonTable) { - TableRowWriter writer = new TableRowWriter(c.element()); - writer.open(UUID.randomUUID().toString()); - TableRowWriter.Result writerResult = writer.close(); - // Return a null destination in this case - the constant DynamicDestinations class will - // resolve it to the singleton output table. - results.add( - new Result<DestinationT>( - writerResult.resourceId.toString(), - writerResult.byteSize, - null)); + String tempFilePrefix = c.sideInput(this.tempFilePrefix); + TableRowWriter writer = new TableRowWriter(tempFilePrefix); + writer.close(); + TableRowWriter.Result writerResult = writer.getResult(); + // Return a null destination in this case - the constant DynamicDestinations class will + // resolve it to the singleton output table. + results.add( + new Result<DestinationT>( + writerResult.resourceId.toString(), writerResult.byteSize, null)); } Map<DestinationT, DestinationData> currentResults = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index bf9d9f1..f641b32 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; @@ -43,7 +42,7 @@ import org.slf4j.LoggerFactory; /** * Copies temporary tables to destination table. */ -class WriteRename extends DoFn<String, Void> { +class WriteRename extends DoFn<Void, Void> { private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); private final BigQueryServices bqServices; @@ -123,13 +122,13 @@ class WriteRename extends DoFn<String, Void> { String projectId = ref.getProjectId(); Job lastFailedCopyJob = null; - for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { + for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); - Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); + Job copyJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); Status jobStatus = BigQueryHelpers.parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: @@ -154,7 +153,7 @@ class WriteRename extends DoFn<String, Void> { "Failed to create copy job with id prefix %s, " + "reached max retries: %d, last failed copy job: %s.", jobIdPrefix, - Write.MAX_RETRY_JOBS, + BatchLoads.MAX_RETRY_JOBS, BigQueryHelpers.jobToPrettyString(lastFailedCopyJob))); } http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java index bc18e8e..db0be3a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag; /** * The result of a {@link BigQueryIO.Write} transform. */ -final class WriteResult implements POutput { +public final class WriteResult implements POutput { private final Pipeline pipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 83ff16b..c5494d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; @@ -149,11 +148,11 @@ class WriteTables<DestinationT> String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; - for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { + for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); - Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); + Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); Status jobStatus = BigQueryHelpers.parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: @@ -181,7 +180,7 @@ class WriteTables<DestinationT> "Failed to create load job with id prefix %s, " + "reached max retries: %d, last failed load job: %s.", jobIdPrefix, - Write.MAX_RETRY_JOBS, + BatchLoads.MAX_RETRY_JOBS, BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 0d3f000..d60c721 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -70,6 +70,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; @@ -490,9 +491,19 @@ public class BigQueryIOTest implements Serializable { p.apply("Create SideInput2", Create.of(KV.of("a", "a"), KV.of("b", "b"), KV.of("c", "c"))) .apply("AsMap", View.<String, String>asMap()); - PCollection<String> users = p.apply("CreateUsers", - Create.of("bill1", "sam2", "laurence3") - .withCoder(StringUtf8Coder.of())) + final List<String> allUsernames = ImmutableList.of("bill", "bob", "randolph"); + List<String> userList = Lists.newArrayList(); + // Make sure that we generate enough users so that WriteBundlesToFiles is forced to spill to + // WriteGroupedRecordsToFiles. + for (int i = 0; i < BatchLoads.DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE * 10; ++i) { + // Every user has 10 nicknames. + for (int j = 0; j < 1; ++j) { + String nickname = allUsernames.get( + ThreadLocalRandom.current().nextInt(allUsernames.size())); + userList.add(nickname + i); + } + } + PCollection<String> users = p.apply("CreateUsers", Create.of(userList)) .apply(Window.into(new PartitionedGlobalWindows<>( new SerializableFunction<String, String>() { @Override @@ -506,6 +517,8 @@ public class BigQueryIOTest implements Serializable { } users.apply("WriteBigQuery", BigQueryIO.<String>write() .withTestServices(fakeBqServices) + .withMaxFilesPerBundle(5) + .withMaxFileSize(10) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withFormatFunction(new SerializableFunction<String, TableRow>() { @Override @@ -567,12 +580,24 @@ public class BigQueryIOTest implements Serializable { File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 0); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-1"), - containsInAnyOrder(new TableRow().set("name", "bill").set("id", 1))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-2"), - containsInAnyOrder(new TableRow().set("name", "sam").set("id", 2))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-3"), - containsInAnyOrder(new TableRow().set("name", "laurence").set("id", 3))); + Map<Integer, List<TableRow>> expectedTableRows = Maps.newHashMap(); + for (int i = 0; i < userList.size(); ++i) { + Matcher matcher = userPattern.matcher(userList.get(i)); + checkState(matcher.matches()); + String nickname = matcher.group(1); + int userid = Integer.valueOf(matcher.group(2)); + List<TableRow> expected = expectedTableRows.get(userid); + if (expected == null) { + expected = Lists.newArrayList(); + expectedTableRows.put(userid, expected); + } + expected.add(new TableRow().set("name", nickname).set("id", userid)); + } + + for (Map.Entry<Integer, List<TableRow>> entry : expectedTableRows.entrySet()) { + assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()), + containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class))); + } } @Test @@ -1655,7 +1680,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionSinglePartition() throws Exception { - long numFiles = BigQueryIO.Write.MAX_NUM_FILES; + long numFiles = BatchLoads.MAX_NUM_FILES; long fileSize = 1; // One partition is needed. @@ -1665,7 +1690,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionManyFiles() throws Exception { - long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3; + long numFiles = BatchLoads.MAX_NUM_FILES * 3; long fileSize = 1; // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files. @@ -1676,7 +1701,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testWritePartitionLargeFileSize() throws Exception { long numFiles = 10; - long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3; + long fileSize = BatchLoads.MAX_SIZE_BYTES / 3; // One partition is needed for each group of three files. long expectedNumPartitions = 4; @@ -1726,22 +1751,25 @@ public class BigQueryIOTest implements Serializable { TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag = new TupleTag<KV<ShardedKey<String>, List<String>>>("singlePartitionTag") {}; - PCollection<WriteBundlesToFiles.Result<String>> filesPCollection = - p.apply(Create.of(files) - .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()))); PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView = - PCollectionViews.iterableView( - filesPCollection, - WindowingStrategy.globalDefault(), - WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())); + p.apply( + Create.of(files) + .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()))) + .apply(View.<WriteBundlesToFiles.Result<String>>asIterable()); + + String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath(); + PCollectionView<String> tempFilePrefixView = + p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton()); WritePartition<String> writePartition = - new WritePartition<>(isSingleton, resultsView, multiPartitionsTag, singlePartitionTag); + new WritePartition<>( + isSingleton, tempFilePrefixView, resultsView, multiPartitionsTag, singlePartitionTag); - DoFnTester<String, KV<ShardedKey<String>, List<String>>> tester = + DoFnTester<Void, KV<ShardedKey<String>, List<String>>> tester = DoFnTester.of(writePartition); tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); - tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix); + tester.processElement(null); List<KV<ShardedKey<String>, List<String>>> partitions; if (expectedNumPartitionsPerTable > 1) { @@ -1897,11 +1925,10 @@ public class BigQueryIOTest implements Serializable { int numFiles = 10; List<String> fileNames = Lists.newArrayList(); String tempFilePrefix = bqOptions.getTempLocation() + "/"; - TableRowWriter writer = new TableRowWriter(tempFilePrefix); for (int i = 0; i < numFiles; ++i) { - String fileName = String.format("files%05d", i); - writer.open(fileName); - fileNames.add(writer.close().resourceId.toString()); + TableRowWriter writer = new TableRowWriter(tempFilePrefix); + writer.close(); + fileNames.add(writer.getResult().resourceId.toString()); } fileNames.add(tempFilePrefix + String.format("files%05d", numFiles)); @@ -1992,7 +2019,7 @@ public class BigQueryIOTest implements Serializable { CreateDisposition.CREATE_IF_NEEDED, tempTablesView); - DoFnTester<String, Void> tester = DoFnTester.of(writeRename); + DoFnTester<Void, Void> tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.processElement(null); http://git-wip-us.apache.org/repos/asf/beam/blob/1c7e35ae/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 50be0bb..ed6a0be 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -76,9 +76,9 @@ import org.joda.time.Duration; */ class FakeJobService implements JobService, Serializable { static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); - // Whenever a job is started, the first 5 calls to GetJob will report the job as pending, - // the next 5 will return the job as running, and only then will the job report as done. - private static final int GET_JOBS_TRANSITION_INTERVAL = 5; + // Whenever a job is started, the first 2 calls to GetJob will report the job as pending, + // the next 2 will return the job as running, and only then will the job report as done. + private static final int GET_JOBS_TRANSITION_INTERVAL = 2; private FakeDatasetService datasetService;
