[BEAM-383] Modified BigQueryIO to write based on number of files and file sizes
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8db6114e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8db6114e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8db6114e Branch: refs/heads/python-sdk Commit: 8db6114e2087cafc4369f6ec85b04f978dfb1984 Parents: 595d2d4 Author: Ian Zhou <[email protected]> Authored: Wed Jul 20 15:56:21 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Aug 3 23:40:27 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 585 ++++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryServices.java | 7 + .../io/gcp/bigquery/BigQueryServicesImpl.java | 51 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 213 ++++++- 4 files changed, 693 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 8741c9c..2ba7562 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -33,9 +34,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -44,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -52,7 +51,13 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FileIOChannelFactory; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -80,6 +85,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; @@ -93,6 +99,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.io.CountingOutputStream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -110,6 +117,8 @@ import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -196,7 +205,8 @@ import javax.annotation.Nullable; * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should * append to an existing table, replace the table, or verify that the table is * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only - * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}. + * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or + * {@link Write.WriteDisposition#WRITE_APPEND}. * * <h3>Sharding BigQuery output tables</h3> * <p>A common use case is to dynamically generate BigQuery table names based on @@ -1412,6 +1422,19 @@ public class BigQueryIO { * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table. */ public static class Bound extends PTransform<PCollection<TableRow>, PDone> { + // Maximum number of files in a single partition. + static final int MAX_NUM_FILES = 10000; + + // Maximum number of bytes in a single partition. + static final long MAX_SIZE_BYTES = 3 * (1L << 40); + + // The maximum number of retry jobs. + static final int MAX_RETRY_JOBS = 3; + + // The maximum number of retries to poll the status of a job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. + static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + @Nullable final String jsonTableRef; @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; @@ -1666,7 +1689,8 @@ public class BigQueryIO { @Override public PDone apply(PCollection<TableRow> input) { - BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + Pipeline p = input.getPipeline(); + BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup @@ -1680,13 +1704,13 @@ public class BigQueryIO { if (Strings.isNullOrEmpty(table.getProjectId())) { table.setProjectId(options.getProject()); } - String jobIdToken = randomUUIDString(); + String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQuerySinkTemp"), + factory.resolve(tempLocation, "BigQueryWriteTemp"), jobIdToken); } catch (IOException e) { throw new RuntimeException( @@ -1694,16 +1718,120 @@ public class BigQueryIO { e); } - return input.apply("Write", org.apache.beam.sdk.io.Write.to( - new BigQuerySink( + PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + + PCollection<TableRow> inputInGlobalWindow = + input.apply( + Window.<TableRow>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles(tempFilePrefix))); + + TupleTag<KV<Long, List<String>>> multiPartitionsTag = + new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<Long, List<String>>> singlePartitionTag = + new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; + + PCollectionView<Iterable<KV<String, Long>>> resultsView = results + .apply("ResultsView", View.<KV<String, Long>>asIterable()); + PCollectionTuple partitions = singleton.apply(ParDo + .of(new WritePartition( + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + + // Write multiple partitions to separate temporary tables + PCollection<String> tempTables = partitions.get(multiPartitionsTag) + .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( + false, + bqServices, jobIdToken, - table, + tempFilePrefix, + toJsonString(table), jsonSchema, - getWriteDisposition(), - getCreateDisposition(), + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED))); + + PCollectionView<Iterable<String>> tempTablesView = tempTables + .apply("TempTablesView", View.<String>asIterable()); + singleton.apply(ParDo + .of(new WriteRename( + bqServices, + jobIdToken, + toJsonString(table), + writeDisposition, + createDisposition, + tempTablesView)) + .withSideInputs(tempTablesView)); + + // Write single partition to final table + partitions.get(singlePartitionTag) + .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create()) + .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( + true, + bqServices, + jobIdToken, tempFilePrefix, - input.getCoder(), - bqServices))); + toJsonString(table), + jsonSchema, + writeDisposition, + createDisposition))); + + return PDone.in(input.getPipeline()); + } + + private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> { + private TableRowWriter writer = null; + private final String tempFilePrefix; + + WriteBundles(String tempFilePrefix) { + this.tempFilePrefix = tempFilePrefix; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {}", writer); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this OldDoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @Override + public void finishBundle(Context c) throws Exception { + if (writer != null) { + c.output(writer.close()); + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")); + } } @Override @@ -1784,192 +1912,361 @@ public class BigQueryIO { } } - /** Disallow construction of utility class. */ - private Write() {} - } - - /** - * {@link BigQuerySink} is implemented as a {@link FileBasedSink}. - * - * <p>It uses BigQuery load job to import files into BigQuery. - */ - static class BigQuerySink extends FileBasedSink<TableRow> { - private final String jobIdToken; - @Nullable private final String jsonTable; - @Nullable private final String jsonSchema; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - private final Coder<TableRow> coder; - private final BigQueryServices bqServices; - - public BigQuerySink( - String jobIdToken, - @Nullable TableReference table, - @Nullable String jsonSchema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - String tempFile, - Coder<TableRow> coder, - BigQueryServices bqServices) { - super(tempFile, ".json"); - this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); - if (table == null) { - this.jsonTable = null; - } else { - checkArgument(!Strings.isNullOrEmpty(table.getProjectId()), - "Table %s should have a project specified", table); - this.jsonTable = toJsonString(table); - } - this.jsonSchema = jsonSchema; - this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); - this.createDisposition = checkNotNull(createDisposition, "createDisposition"); - this.coder = checkNotNull(coder, "coder"); - this.bqServices = checkNotNull(bqServices, "bqServices"); - } - - @Override - public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation( - PipelineOptions options) { - return new BigQueryWriteOperation(this); - } + static class TableRowWriter { + private static final Coder<TableRow> CODER = TableRowJsonCoder.of(); + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final String tempFilePrefix; + private String id; + private String fileName; + private WritableByteChannel channel; + protected String mimeType = MimeTypes.TEXT; + private CountingOutputStream out; + + TableRowWriter(String basename) { + this.tempFilePrefix = basename; + } + + public final void open(String uId) throws Exception { + id = uId; + fileName = tempFilePrefix + id; + LOG.debug("Opening {}.", fileName); + channel = IOChannelUtils.create(fileName, mimeType); + try { + out = new CountingOutputStream(Channels.newOutputStream(channel)); + LOG.debug("Writing header to {}.", fileName); + } catch (Exception e) { + try { + LOG.error("Writing header to {} failed, closing channel.", fileName); + channel.close(); + } catch (IOException closeException) { + LOG.error("Closing channel for {} failed", fileName); + } + throw e; + } + LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); + public void write(TableRow value) throws Exception { + CODER.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } - builder - .addIfNotNull(DisplayData.item("schema", jsonSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableSpec", jsonTable) - .withLabel("Table Specification")); + public final KV<String, Long> close() throws IOException { + channel.close(); + return KV.of(fileName, out.getCount()); + } } - private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> { - // The maximum number of retry load jobs. - private static final int MAX_RETRY_LOAD_JOBS = 3; + /** + * Partitions temporary files based on number of files and file sizes. + */ + static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> { + private final PCollectionView<Iterable<KV<String, Long>>> resultsView; + private TupleTag<KV<Long, List<String>>> multiPartitionsTag; + private TupleTag<KV<Long, List<String>>> singlePartitionTag; - // The maximum number of retries to poll the status of a load job. - // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. - private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + public WritePartition( + PCollectionView<Iterable<KV<String, Long>>> resultsView, + TupleTag<KV<Long, List<String>>> multiPartitionsTag, + TupleTag<KV<Long, List<String>>> singlePartitionTag) { + this.resultsView = resultsView; + this.multiPartitionsTag = multiPartitionsTag; + this.singlePartitionTag = singlePartitionTag; + } - private final BigQuerySink bigQuerySink; + @Override + public void processElement(ProcessContext c) throws Exception { + List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView)); + if (results.isEmpty()) { + TableRowWriter writer = new TableRowWriter(c.element()); + writer.open(UUID.randomUUID().toString()); + results.add(writer.close()); + } - private BigQueryWriteOperation(BigQuerySink sink) { - super(checkNotNull(sink, "sink")); - this.bigQuerySink = sink; + long partitionId = 0; + int currNumFiles = 0; + long currSizeBytes = 0; + List<String> currResults = Lists.newArrayList(); + for (int i = 0; i < results.size(); ++i) { + KV<String, Long> fileResult = results.get(i); + if (currNumFiles + 1 > Bound.MAX_NUM_FILES + || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + currResults = Lists.newArrayList(); + currNumFiles = 0; + currSizeBytes = 0; + } + ++currNumFiles; + currSizeBytes += fileResult.getValue(); + currResults.add(fileResult.getKey()); + } + if (partitionId == 0) { + c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults)); + } else { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + } } @Override - public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception { - return new TableRowWriter(this, bigQuerySink.coder); + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } + } + + /** + * Writes partitions to BigQuery tables. + */ + static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> { + private final boolean singlePartition; + private final BigQueryServices bqServices; + private final String jobIdToken; + private final String tempFilePrefix; + private final String jsonTableRef; + private final String jsonSchema; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + + public WriteTables( + boolean singlePartition, + BigQueryServices bqServices, + String jobIdToken, + String tempFilePrefix, + String jsonTableRef, + String jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) { + this.singlePartition = singlePartition; + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.tempFilePrefix = tempFilePrefix; + this.jsonTableRef = jsonTableRef; + this.jsonSchema = jsonSchema; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; } @Override - public void finalize(Iterable<FileResult> writerResults, PipelineOptions options) - throws IOException, InterruptedException { - try { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - List<String> tempFiles = Lists.newArrayList(); - for (FileResult result : writerResults) { - tempFiles.add(result.getFilename()); - } - if (!tempFiles.isEmpty()) { - load( - bigQuerySink.bqServices.getJobService(bqOptions), - bigQuerySink.jobIdToken, - fromJsonString(bigQuerySink.jsonTable, TableReference.class), - tempFiles, - fromJsonString(bigQuerySink.jsonSchema, TableSchema.class), - bigQuerySink.writeDisposition, - bigQuerySink.createDisposition); - } - } finally { - removeTemporaryFiles(options); + public void processElement(ProcessContext c) throws Exception { + List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); + String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); + TableReference ref = fromJsonString(jsonTableRef, TableReference.class); + if (!singlePartition) { + ref.setTableId(jobIdPrefix); } + + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + ref, + fromJsonString(jsonSchema, TableSchema.class), + partition, + writeDisposition, + createDisposition); + c.output(toJsonString(ref)); + + removeTemporaryFiles(c.getPipelineOptions(), partition); } - /** - * Import files into BigQuery with load jobs. - * - * <p>Returns if files are successfully loaded into BigQuery. - * Throws a RuntimeException if: - * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data. - * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}. - * - * <p>If a load job failed, it will try another load job with a different job id. - */ private void load( JobService jobService, String jobIdPrefix, TableReference ref, - List<String> gcsUris, @Nullable TableSchema schema, + List<String> gcsUris, WriteDisposition writeDisposition, CreateDisposition createDisposition) throws InterruptedException, IOException { JobConfigurationLoad loadConfig = new JobConfigurationLoad() - .setSourceUris(gcsUris) .setDestinationTable(ref) .setSchema(schema) + .setSourceUris(gcsUris) .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) .setSourceFormat("NEWLINE_DELIMITED_JSON"); - boolean retrying = false; String projectId = ref.getProjectId(); - for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous load jobs failed, retrying."); - } - LOG.info("Starting BigQuery load job: {}", jobId); + LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); Status jobStatus = - parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); + parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status."); + throw new RuntimeException("Failed to poll the load job status of job " + jobId); case FAILED: LOG.info("BigQuery load job failed: {}", jobId); - retrying = true; continue; default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); + throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", + jobStatus, jobId)); } } - throw new RuntimeException( - "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + throw new RuntimeException(String.format("Failed to create the load job %s, reached max " + + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + } + + private void removeTemporaryFiles(PipelineOptions options, Collection<String> matches) + throws IOException { + String pattern = tempFilePrefix + "*"; + LOG.debug("Finding temporary files matching {}.", pattern); + IOChannelFactory factory = IOChannelUtils.getFactory(pattern); + if (factory instanceof GcsIOChannelFactory) { + GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); + gcsUtil.remove(matches); + } else if (factory instanceof FileIOChannelFactory) { + for (String filename : matches) { + LOG.debug("Removing file {}", filename); + boolean exists = Files.deleteIfExists(Paths.get(filename)); + if (!exists) { + LOG.debug("{} does not exist.", filename); + } + } + } else { + throw new IOException("Unrecognized file system."); + } } - } - private static class TableRowWriter extends FileBasedWriter<TableRow> { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final Coder<TableRow> coder; - private OutputStream out; + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - public TableRowWriter( - FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) { - super(writeOperation); - this.mimeType = MimeTypes.TEXT; - this.coder = coder; + builder + .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) + .withLabel("Job ID Token")) + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")) + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) + .withLabel("Table Schema")); + } + } + + /** + * Copies temporary tables to destination table. + */ + static class WriteRename extends OldDoFn<String, Void> { + private final BigQueryServices bqServices; + private final String jobIdToken; + private final String jsonTableRef; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + private final PCollectionView<Iterable<String>> tempTablesView; + + public WriteRename( + BigQueryServices bqServices, + String jobIdToken, + String jsonTableRef, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + PCollectionView<Iterable<String>> tempTablesView) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.jsonTableRef = jsonTableRef; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.tempTablesView = tempTablesView; } @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = Channels.newOutputStream(channel); + public void processElement(ProcessContext c) throws Exception { + List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + + // Do not copy if not temp tables are provided + if (tempTablesJson.size() == 0) { + return; + } + + List<TableReference> tempTables = Lists.newArrayList(); + for (String table : tempTablesJson) { + tempTables.add(fromJsonString(table, TableReference.class)); + } + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdToken, + fromJsonString(jsonTableRef, TableReference.class), + tempTables, + writeDisposition, + createDisposition); + + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); + } + + private void copy( + JobService jobService, + String jobIdPrefix, + TableReference ref, + List<TableReference> tempTables, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() + .setSourceTables(tempTables) + .setDestinationTable(ref) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()); + + String projectId = ref.getProjectId(); + for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startCopyJob(jobRef, copyConfig); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the copy job status of job " + jobId); + case FAILED: + LOG.info("BigQuery copy job failed: {}", jobId); + continue; + default: + throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", + jobStatus, jobId)); + } + } + throw new RuntimeException(String.format("Failed to create the copy job %s, reached max " + + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + } + + private void removeTemporaryTables(DatasetService tableService, + List<TableReference> tempTables) throws Exception { + for (TableReference tableRef : tempTables) { + tableService.deleteTable( + tableRef.getProjectId(), + tableRef.getDatasetId(), + tableRef.getTableId()); + } } @Override - public void write(TableRow value) throws Exception { - // Use Context.OUTER to encode and NEWLINE as the delimeter. - coder.encode(value, out, Context.OUTER); - out.write(NEWLINE); + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) + .withLabel("Job ID Token")) + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .add(DisplayData.item("writeDisposition", writeDisposition.toString()) + .withLabel("Write Disposition")) + .add(DisplayData.item("createDisposition", createDisposition.toString()) + .withLabel("Create Disposition")); } } + + /** Disallow construction of utility class. */ + private Write() {} } private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { @@ -2093,8 +2390,8 @@ public class BigQueryIO { TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class); Bigquery client = Transport.newBigQueryClient(options).build(); BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); - inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND, - CreateDisposition.CREATE_IF_NEEDED, tableSchema); + inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, + Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema); createdTables.add(tableSpec); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 29a335d..0af6df8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; @@ -83,6 +84,12 @@ interface BigQueryServices extends Serializable { throws IOException, InterruptedException; /** + * Start a BigQuery copy job. + */ + void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException; + + /** * Waits for the job is Done, and returns the job. * * <p>Returns null if the {@code maxAttempts} retries reached. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index ef17e0f..bd1097f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -39,6 +39,7 @@ import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; @@ -124,9 +125,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startLoadJob( @@ -142,9 +143,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) @@ -160,9 +161,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) @@ -175,6 +176,24 @@ class BigQueryServicesImpl implements BigQueryServices { startJob(job, errorExtractor, client); } + /** + * {@inheritDoc} + * + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + */ + @Override + public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException { + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setCopy(copyConfig)); + + startJob(job, errorExtractor, client); + } + private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client) throws IOException, InterruptedException { @@ -320,9 +339,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public Table getTable(String projectId, String datasetId, String tableId) @@ -341,9 +360,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void deleteTable(String projectId, String datasetId, String tableId) @@ -377,9 +396,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public Dataset getDataset(String projectId, String datasetId) @@ -398,9 +417,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void createDataset( @@ -456,9 +475,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void deleteDataset(String projectId, String datasetId) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 7d2df62..1ea1f94 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,14 +26,17 @@ import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource; @@ -44,6 +47,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -58,16 +64,23 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; import com.google.api.client.util.Data; import com.google.api.client.util.Strings; @@ -76,6 +89,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; @@ -110,6 +124,9 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -123,6 +140,8 @@ import javax.annotation.Nullable; @RunWith(JUnit4.class) public class BigQueryIOTest implements Serializable { + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + // Status.UNKNOWN maps to null private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of( Status.SUCCEEDED, new Job().setStatus(new JobStatus()), @@ -275,6 +294,12 @@ public class BigQueryIOTest implements Serializable { } @Override + public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException { + startJob(jobRef); + } + + @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { if (!Strings.isNullOrEmpty(executingProject)) { @@ -565,7 +590,8 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done", "done") - .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); + .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -584,7 +610,6 @@ public class BigQueryIOTest implements Serializable { p.run(); logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("Previous load jobs failed, retrying."); File tempDir = new File(bqOptions.getTempLocation()); assertEquals(0, tempDir.listFiles(new FileFilter() { @Override @@ -613,7 +638,7 @@ public class BigQueryIOTest implements Serializable { .withoutValidation()); thrown.expect(RuntimeException.class); - thrown.expectMessage("Failed to poll the load job status."); + thrown.expectMessage("Failed to poll the load job status"); p.run(); File tempDir = new File(bqOptions.getTempLocation()); @@ -1228,4 +1253,186 @@ public class BigQueryIOTest implements Serializable { p.run(); } + + @Test + public void testWritePartitionEmptyData() throws Exception { + final long numFiles = 0; + final long fileSize = 0; + + // An empty file is created for no input data. One partition is needed. + final long expectedNumPartitions = 1; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + @Test + public void testWritePartitionSinglePartition() throws Exception { + final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES; + final long fileSize = 1; + + // One partition is needed. + final long expectedNumPartitions = 1; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + @Test + public void testWritePartitionManyFiles() throws Exception { + final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3; + final long fileSize = 1; + + // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files. + final long expectedNumPartitions = 3; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + @Test + public void testWritePartitionLargeFileSize() throws Exception { + final long numFiles = 10; + final long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3; + + // One partition is needed for each group of three files. + final long expectedNumPartitions = 4; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions) + throws Exception { + final List<Long> expectedPartitionIds = Lists.newArrayList(); + for (long i = 1; i <= expectedNumPartitions; ++i) { + expectedPartitionIds.add(i); + } + + final List<KV<String, Long>> files = Lists.newArrayList(); + final List<String> fileNames = Lists.newArrayList(); + for (int i = 0; i < numFiles; ++i) { + String fileName = String.format("files%05d", i); + fileNames.add(fileName); + files.add(KV.of(fileName, fileSize)); + } + + TupleTag<KV<Long, List<String>>> multiPartitionsTag = + new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<Long, List<String>>> singlePartitionTag = + new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; + + final PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView( + TestPipeline.create(), + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); + + WritePartition writePartition = + new WritePartition(filesView, multiPartitionsTag, singlePartitionTag); + + DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition); + tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); + tester.processElement(tmpFolder.getRoot().getAbsolutePath()); + + List<KV<Long, List<String>>> partitions; + if (expectedNumPartitions > 1) { + partitions = tester.takeSideOutputElements(multiPartitionsTag); + } else { + partitions = tester.takeSideOutputElements(singlePartitionTag); + } + List<Long> partitionIds = Lists.newArrayList(); + List<String> partitionFileNames = Lists.newArrayList(); + for (KV<Long, List<String>> partition : partitions) { + partitionIds.add(partition.getKey()); + for (String name : partition.getValue()) { + partitionFileNames.add(name); + } + } + + assertEquals(expectedPartitionIds, partitionIds); + if (numFiles == 0) { + assertThat(partitionFileNames, Matchers.hasSize(1)); + assertTrue(Files.exists(Paths.get(partitionFileNames.get(0)))); + assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length, + Matchers.equalTo(0)); + } else { + assertEquals(fileNames, partitionFileNames); + } + } + + @Test + public void testWriteTables() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done", "done") + .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + + final long numPartitions = 3; + final long numFilesPerPartition = 10; + final String jobIdToken = "jobIdToken"; + final String tempFilePrefix = "tempFilePrefix"; + final String jsonTable = "{}"; + final String jsonSchema = "{}"; + final List<String> expectedTempTables = Lists.newArrayList(); + + final List<KV<Long, Iterable<List<String>>>> partitions = Lists.newArrayList(); + for (long i = 0; i < numPartitions; ++i) { + List<String> filesPerPartition = Lists.newArrayList(); + for (int j = 0; j < numFilesPerPartition; ++j) { + filesPerPartition.add(String.format("files%05d", j)); + } + partitions.add(KV.of(i, (Iterable<List<String>>) Collections.singleton(filesPerPartition))); + expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); + } + + WriteTables writeTables = new WriteTables( + false, + fakeBqServices, + jobIdToken, + tempFilePrefix, + jsonTable, + jsonSchema, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED); + + DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables); + for (KV<Long, Iterable<List<String>>> partition : partitions) { + tester.processElement(partition); + } + + List<String> tempTables = tester.takeOutputElements(); + + logged.verifyInfo("Starting BigQuery load job"); + + assertEquals(expectedTempTables, tempTables); + } + + @Test + public void testWriteRename() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.FAILED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + + final long numTempTables = 3; + final String jobIdToken = "jobIdToken"; + final String jsonTable = "{}"; + final List<String> tempTables = Lists.newArrayList(); + for (long i = 0; i < numTempTables; ++i) { + tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); + } + + final PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView( + TestPipeline.create(), + WindowingStrategy.globalDefault(), + StringUtf8Coder.of()); + + WriteRename writeRename = new WriteRename( + fakeBqServices, + jobIdToken, + jsonTable, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + tempTablesView); + + DoFnTester<String, Void> tester = DoFnTester.of(writeRename); + tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); + tester.processElement(null); + + logged.verifyInfo("Starting BigQuery copy job"); + } }
