Repository: beam Updated Branches: refs/heads/master 352f106f9 -> b2138b0d7
Move file deletion into subsequent ParDo. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d9b8f06 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d9b8f06 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d9b8f06 Branch: refs/heads/master Commit: 6d9b8f06eeb9a2b0728f1dcf3c1e669bbd53959d Parents: 352f106 Author: Reuven Lax <[email protected]> Authored: Tue Sep 19 10:10:18 2017 -0700 Committer: Reuven Lax <[email protected]> Committed: Tue Sep 26 10:40:33 2017 -0400 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/LocalFileSystem.java | 9 +- .../org/apache/beam/sdk/io/FileSystemsTest.java | 15 +- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 15 +- .../sdk/io/gcp/bigquery/TableDestination.java | 5 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 28 ++- .../bigquery/WriteGroupedRecordsToFiles.java | 7 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 190 +++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 122 +++++++----- .../sdk/io/gcp/bigquery/FakeJobService.java | 5 +- 9 files changed, 258 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 5fe894d..3891b91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -34,6 +34,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; @@ -181,8 +182,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> { @Override protected void delete(Collection<LocalResourceId> resourceIds) throws IOException { for (LocalResourceId resourceId : resourceIds) { - LOG.debug("Deleting file {}", resourceId); - Files.delete(resourceId.getPath()); + try { + Files.delete(resourceId.getPath()); + } catch (NoSuchFileException e) { + LOG.info("Ignoring failed deletion of file {} which already does not exist: {}", resourceId, + e); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index a75c54d..3e393bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -83,18 +83,6 @@ public class FileSystemsTest { } @Test - public void testDeleteThrowsNoSuchFileException() throws Exception { - Path existingPath = temporaryFolder.newFile().toPath(); - Path nonExistentPath = existingPath.resolveSibling("non-existent"); - - createFileWithContent(existingPath, "content1"); - - thrown.expect(NoSuchFileException.class); - FileSystems.delete( - toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */)); - } - - @Test public void testDeleteIgnoreMissingFiles() throws Exception { Path existingPath = temporaryFolder.newFile().toPath(); Path nonExistentPath = existingPath.resolveSibling("non-existent"); @@ -102,8 +90,7 @@ public class FileSystemsTest { createFileWithContent(existingPath, "content1"); FileSystems.delete( - toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */), - MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 76cf7e8..6d832e4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -216,7 +216,6 @@ class BatchLoads<DestinationT> .discardingFiredPanes()); PCollection<WriteBundlesToFiles.Result<DestinationT>> results = writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); - // Apply the user's trigger before we start generating BigQuery load jobs. results = results.apply( @@ -480,15 +479,14 @@ class BatchLoads<DestinationT> .apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) .apply( "MultiPartitionsWriteTables", - ParDo.of( - new WriteTables<>( + new WriteTables<>( false, bigQueryServices, jobIdTokenView, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - dynamicDestinations)) - .withSideInputs(sideInputs)); + sideInputs, + dynamicDestinations)); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -510,15 +508,14 @@ class BatchLoads<DestinationT> .apply("SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) .apply( "SinglePartitionWriteTables", - ParDo.of( - new WriteTables<>( + new WriteTables<>( true, bigQueryServices, jobIdTokenView, writeDisposition, createDisposition, - dynamicDestinations)) - .withSideInputs(sideInputs)); + sideInputs, + dynamicDestinations)); } private WriteResult writeResult(Pipeline p) { http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index ecc34d3..ce2e7c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -104,11 +104,12 @@ public class TableDestination implements Serializable { } TableDestination other = (TableDestination) o; return Objects.equals(this.tableSpec, other.tableSpec) - && Objects.equals(this.tableDescription, other.tableDescription); + && Objects.equals(this.tableDescription, other.tableDescription) + && Objects.equals(this.jsonTimePartitioning, other.jsonTimePartitioning); } @Override public int hashCode() { - return Objects.hash(tableSpec, tableDescription); + return Objects.hash(tableSpec, tableDescription, jsonTimePartitioning); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index e337f94..017d5c1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -30,6 +30,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -75,7 +76,7 @@ class WriteBundlesToFiles<DestinationT> * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, * and encapsulates the table it is destined to as well as the file byte size. */ - public static final class Result<DestinationT> implements Serializable { + static final class Result<DestinationT> implements Serializable { private static final long serialVersionUID = 1L; public final String filename; public final Long fileByteSize; @@ -87,6 +88,31 @@ class WriteBundlesToFiles<DestinationT> this.fileByteSize = fileByteSize; this.destination = destination; } + + @Override + public boolean equals(Object other) { + if (other instanceof Result) { + Result<DestinationT> o = (Result<DestinationT>) other; + return Objects.equals(this.filename, o.filename) + && Objects.equals(this.fileByteSize, o.fileByteSize) + && Objects.equals(this.destination, o.destination); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(filename, fileByteSize, destination); + } + + @Override + public String toString() { + return "Result{" + + "filename='" + filename + '\'' + + ", fileByteSize=" + fileByteSize + + ", destination=" + destination + + '}'; + } } /** a coder for the {@link Result} class. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 887cb93..e82b29d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -48,18 +48,21 @@ class WriteGroupedRecordsToFiles<DestinationT> public void processElement(ProcessContext c) throws Exception { String tempFilePrefix = c.sideInput(this.tempFilePrefix); TableRowWriter writer = new TableRowWriter(tempFilePrefix); - try (TableRowWriter ignored = writer) { + try { for (TableRow tableRow : c.element().getValue()) { if (writer.getByteSize() > maxFileSize) { writer.close(); + writer = new TableRowWriter(tempFilePrefix); TableRowWriter.Result result = writer.getResult(); c.output(new WriteBundlesToFiles.Result<>( result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); - writer = new TableRowWriter(tempFilePrefix); } writer.write(tableRow); } + } finally { + writer.close(); } + TableRowWriter.Result result = writer.getResult(); c.output( new WriteBundlesToFiles.Result<>( http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index a646f17..f8ed796 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -29,12 +29,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -42,9 +43,22 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +75,8 @@ import org.slf4j.LoggerFactory; * {@link KV} maps the final table to itself. */ class WriteTables<DestinationT> - extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination, String>> { + extends PTransform<PCollection<KV<ShardedKey<DestinationT>, List<String>>>, + PCollection<KV<TableDestination, String>>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); private final boolean singlePartition; @@ -70,7 +85,84 @@ class WriteTables<DestinationT> private final WriteDisposition firstPaneWriteDisposition; private final CreateDisposition firstPaneCreateDisposition; private final DynamicDestinations<?, DestinationT> dynamicDestinations; - private Map<DestinationT, String> jsonSchemas = Maps.newHashMap(); + private final List<PCollectionView<?>> sideInputs; + private final TupleTag<KV<TableDestination, String>> mainOutputTag; + private final TupleTag<String> temporaryFilesTag; + + + private class WriteTablesDoFn + extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination, String>> { + private Map<DestinationT, String> jsonSchemas = Maps.newHashMap(); + + @StartBundle + public void startBundle(StartBundleContext c) { + // Clear the map on each bundle so we can notice side-input updates. + // (alternative is to use a cache with a TTL). + jsonSchemas.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + DestinationT destination = c.element().getKey().getKey(); + TableSchema tableSchema; + String jsonSchema = jsonSchemas.get(destination); + if (jsonSchema != null) { + tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + } else { + tableSchema = dynamicDestinations.getSchema(destination); + if (tableSchema != null) { + jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); + } + } + + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableReference tableReference = tableDestination.getTableReference(); + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + c.getPipelineOptions().as(BigQueryOptions.class).getProject()); + tableDestination = new TableDestination( + tableReference, tableDestination.getTableDescription()); + } + + Integer partition = c.element().getKey().getShardNumber(); + List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); + String jobIdPrefix = BigQueryHelpers.createJobId( + c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex()); + + if (!singlePartition) { + tableReference.setTableId(jobIdPrefix); + } + + WriteDisposition writeDisposition = + (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + CreateDisposition createDisposition = + (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableSchema, + partitionFiles, + writeDisposition, + createDisposition, + tableDestination.getTableDescription()); + c.output( + mainOutputTag, KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); + for (String file : partitionFiles) { + c.output(temporaryFilesTag, file); + } + } + } + + private class GarbageCollectTemporaryFiles extends DoFn<Iterable<String>, Void> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + removeTemporaryFiles(c.element()); + } + } public WriteTables( boolean singlePartition, @@ -78,74 +170,48 @@ class WriteTables<DestinationT> PCollectionView<String> jobIdToken, WriteDisposition writeDisposition, CreateDisposition createDisposition, + List<PCollectionView<?>> sideInputs, DynamicDestinations<?, DestinationT> dynamicDestinations) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.firstPaneWriteDisposition = writeDisposition; this.firstPaneCreateDisposition = createDisposition; + this.sideInputs = sideInputs; this.dynamicDestinations = dynamicDestinations; + this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput"); + this.temporaryFilesTag = new TupleTag<>("TemporaryFiles"); } - @StartBundle - public void startBundle(StartBundleContext c) { - // Clear the map on each bundle so we can notice side-input updates. - // (alternative is to use a cache with a TTL). - jsonSchemas.clear(); - } + @Override + public PCollection<KV<TableDestination, String>> expand( + PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) { + PCollectionTuple writeTablesOutputs = input.apply(ParDo.of(new WriteTablesDoFn()) + .withSideInputs(sideInputs) + .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - dynamicDestinations.setSideInputAccessorFromProcessContext(c); - DestinationT destination = c.element().getKey().getKey(); - TableSchema tableSchema; - String jsonSchema = jsonSchemas.get(destination); - if (jsonSchema != null) { - tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); - } else { - tableSchema = dynamicDestinations.getSchema(destination); - if (tableSchema != null) { - jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); - } - } - - TableDestination tableDestination = dynamicDestinations.getTable(destination); - TableReference tableReference = tableDestination.getTableReference(); - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId( - c.getPipelineOptions().as(BigQueryOptions.class).getProject()); - tableDestination = new TableDestination( - tableReference, tableDestination.getTableDescription()); - } + // Garbage collect temporary files. + // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has + // succeeded in loading those files and won't be retried. Otherwise, we might fail part of the + // way through deleting temporary files, and retry WriteTablesDoFn. This will then fail due + // to missing files, causing either the entire workflow to fail or get stuck (depending on how + // the runner handles persistent failures). + writeTablesOutputs + .get(temporaryFilesTag) + .setCoder(StringUtf8Coder.of()) + .apply(WithKeys.<Void, String>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply(Window.<KV<Void, String>>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(GroupByKey.<Void, String>create()) + .apply(Values.<Iterable<String>>create()) + .apply(ParDo.of(new GarbageCollectTemporaryFiles())); - Integer partition = c.element().getKey().getShardNumber(); - List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); - String jobIdPrefix = BigQueryHelpers.createJobId( - c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex()); + return writeTablesOutputs.get(mainOutputTag); + } - if (!singlePartition) { - tableReference.setTableId(jobIdPrefix); - } - WriteDisposition writeDisposition = - (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; - CreateDisposition createDisposition = - (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; - load( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableSchema, - partitionFiles, - writeDisposition, - createDisposition, - tableDestination.getTableDescription()); - c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); - - removeTemporaryFiles(partitionFiles); - } private void load( JobService jobService, @@ -208,11 +274,11 @@ class WriteTables<DestinationT> BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } - static void removeTemporaryFiles(Collection<String> files) throws IOException { + static void removeTemporaryFiles(Iterable<String> files) throws IOException { ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder(); - for (String file: files) { + for (String file : files) { fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */)); } - FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + FileSystems.delete(fileResources.build()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index ad4cbaa..5500b12 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -63,9 +63,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.math.BigDecimal; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -81,16 +78,13 @@ import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -130,7 +124,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -489,6 +482,40 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testWriteEmptyPCollection() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + + datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))); + + p.apply(Create.empty(TableRowJsonCoder.of())) + .apply(BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withTestServices(fakeBqServices) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema(schema) + .withoutValidation()); + p.run(); + checkNotNull(datasetService.getTable( + BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"))); + testNumFiles(new File(bqOptions.getTempLocation()), 0); + } + + @Test public void testWriteDynamicDestinationsBatch() throws Exception { writeDynamicDestinations(false); } @@ -635,6 +662,7 @@ public class BigQueryIOTest implements Serializable { assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()), containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class))); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -684,6 +712,7 @@ public class BigQueryIOTest implements Serializable { BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")); assertEquals(schema, table.getSchema()); assertEquals(timePartitioning, table.getTimePartitioning()); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -737,6 +766,7 @@ public class BigQueryIOTest implements Serializable { assertThat( datasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -836,6 +866,7 @@ public class BigQueryIOTest implements Serializable { // Only row1 and row3 were successfully inserted. assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(row1, row3)); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -908,6 +939,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3), new TableRow().set("name", "d").set("number", 4))); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } /** @@ -1128,6 +1160,7 @@ public class BigQueryIOTest implements Serializable { new TableRow().set("name", String.format("number%d", i)).set("number", i), new TableRow().set("name", String.format("number%d", i + 5)).set("number", i + 5))); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -1142,6 +1175,7 @@ public class BigQueryIOTest implements Serializable { .withDatasetService(datasetService); datasetService.createDataset("project-id", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), @@ -1160,6 +1194,7 @@ public class BigQueryIOTest implements Serializable { File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 0); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -2029,19 +2064,23 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteTables() throws Exception { - p.enableAbandonedNodeEnforcement(false); + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; - String jobIdToken = "jobIdToken"; - String stepUuid = "stepUuid"; - Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap(); + String jobIdToken = "jobId"; + final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create(); Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); @@ -2055,35 +2094,29 @@ public class BigQueryIOTest implements Serializable { for (int k = 0; k < numFilesPerPartition; ++k) { String filename = Paths.get(baseDir.toString(), String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString(); - ResourceId fileResource = - FileSystems.matchNewResource(filename, false /* isDirectory */); - try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) { - try (OutputStream output = Channels.newOutputStream(channel)) { - TableRow tableRow = new TableRow().set("name", tableName); - TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER); - output.write("\n".getBytes(StandardCharsets.UTF_8)); - } + TableRowWriter writer = new TableRowWriter(filename); + try (TableRowWriter ignored = writer) { + TableRow tableRow = new TableRow().set("name", tableName); + writer.write(tableRow); } - filesPerPartition.add(filename); + filesPerPartition.add(writer.getResult().resourceId.toString()); } partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition)); - List<String> expectedTables = expectedTempTables.get(tableDestination); - if (expectedTables == null) { - expectedTables = Lists.newArrayList(); - expectedTempTables.put(tableDestination, expectedTables); - } String json = String.format( "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}", tempTableId); - expectedTables.add(json); + expectedTempTables.put(tableDestination, json); } } + PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput = + p.apply(Create.of(partitions)); PCollectionView<String> jobIdTokenView = p .apply("CreateJobId", Create.of("jobId")) .apply(View.<String>asSingleton()); + List<PCollectionView<?>> sideInputs = ImmutableList.<PCollectionView<?>>of(jobIdTokenView); WriteTables<String> writeTables = new WriteTables<>( @@ -2092,26 +2125,29 @@ public class BigQueryIOTest implements Serializable { jobIdTokenView, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, + sideInputs, new IdentityDynamicTables()); - DoFnTester<KV<ShardedKey<String>, List<String>>, - KV<TableDestination, String>> tester = DoFnTester.of(writeTables); - tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - tester.getPipelineOptions().setTempLocation("tempLocation"); - for (KV<ShardedKey<String>, List<String>> partition : partitions) { - tester.processElement(partition); - } + PCollection<KV<TableDestination, String>> writeTablesOutput = + writeTablesInput.apply(writeTables); - Map<TableDestination, List<String>> tempTablesResult = Maps.newHashMap(); - for (KV<TableDestination, String> element : tester.takeOutputElements()) { - List<String> tables = tempTablesResult.get(element.getKey()); - if (tables == null) { - tables = Lists.newArrayList(); - tempTablesResult.put(element.getKey(), tables); - } - tables.add(element.getValue()); - } - assertEquals(expectedTempTables, tempTablesResult); + PAssert.thatMultimap(writeTablesOutput) + .satisfies( + new SerializableFunction<Map<TableDestination, Iterable<String>>, Void>() { + @Override + public Void apply(Map<TableDestination, Iterable<String>> input) { + assertEquals(input.keySet(), expectedTempTables.keySet()); + for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet()) { + @SuppressWarnings("unchecked") + String[] expectedValues = Iterables.toArray( + expectedTempTables.get(entry.getKey()), String.class); + assertThat(entry.getValue(), containsInAnyOrder(expectedValues)); + } + return null; + } + }); + p.run(); + testNumFiles(baseDir.toFile(), 0); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index cc600d1..f13a7ab 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -63,7 +63,6 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -129,8 +128,7 @@ class FakeJobService implements JobService, Serializable { filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */)); } - FileSystems.copy(sourceFiles.build(), loadFiles.build(), - MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + FileSystems.copy(sourceFiles.build(), loadFiles.build()); filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build()); } @@ -325,6 +323,7 @@ class FakeJobService implements JobService, Serializable { rows.addAll(readRows(filename.toString())); } datasetService.insertAll(destination, rows, null); + FileSystems.delete(sourceFiles); return new JobStatus().setState("DONE"); }
