Adds DynamicDestinations support to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c336e84 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c336e84 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c336e84 Branch: refs/heads/DSL_SQL Commit: 4c336e840e69e83e15d9ffb7e0a0178dd3ab8404 Parents: 1f6117f Author: Reuven Lax <[email protected]> Authored: Fri Jun 9 17:11:32 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:01 2017 -0700 ---------------------------------------------------------------------- .../examples/common/WriteOneFilePerWindow.java | 52 +- .../beam/examples/WindowedWordCountIT.java | 4 +- .../complete/game/utils/WriteToText.java | 43 +- .../construction/WriteFilesTranslation.java | 67 +- .../construction/PTransformMatchersTest.java | 22 +- .../construction/WriteFilesTranslationTest.java | 62 +- .../direct/WriteWithShardingFactory.java | 6 +- .../direct/WriteWithShardingFactoryTest.java | 18 +- .../beam/runners/dataflow/DataflowRunner.java | 15 +- .../runners/dataflow/DataflowRunnerTest.java | 35 +- .../runners/spark/SparkRunnerDebuggerTest.java | 26 +- .../src/main/proto/beam_runner_api.proto | 7 +- .../apache/beam/sdk/coders/ShardedKeyCoder.java | 66 ++ .../java/org/apache/beam/sdk/io/AvroIO.java | 220 ++++--- .../java/org/apache/beam/sdk/io/AvroSink.java | 32 +- .../beam/sdk/io/DefaultFilenamePolicy.java | 274 +++++--- .../beam/sdk/io/DynamicFileDestinations.java | 115 ++++ .../org/apache/beam/sdk/io/FileBasedSink.java | 513 ++++++++------- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 44 +- .../java/org/apache/beam/sdk/io/TextIO.java | 488 ++++++++++---- .../java/org/apache/beam/sdk/io/TextSink.java | 22 +- .../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++++++++++-------- .../sdk/transforms/SerializableFunctions.java | 50 ++ .../org/apache/beam/sdk/values/ShardedKey.java | 65 ++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 85 ++- .../beam/sdk/io/DefaultFilenamePolicyTest.java | 135 ++-- .../sdk/io/DrunkWritableByteChannelFactory.java | 2 +- .../apache/beam/sdk/io/FileBasedSinkTest.java | 93 +-- .../java/org/apache/beam/sdk/io/SimpleSink.java | 56 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++++++- .../org/apache/beam/sdk/io/WriteFilesTest.java | 339 ++++++++-- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 + .../io/gcp/bigquery/DynamicDestinations.java | 29 +- .../io/gcp/bigquery/GenerateShardedTable.java | 1 + .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 67 -- .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 74 --- .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 + .../io/gcp/bigquery/StreamingWriteTables.java | 2 + .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 1 + .../io/gcp/bigquery/WriteBundlesToFiles.java | 2 + .../bigquery/WriteGroupedRecordsToFiles.java | 1 + .../sdk/io/gcp/bigquery/WritePartition.java | 1 + .../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 + .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 21 +- .../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 4 +- 47 files changed, 2710 insertions(+), 1363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index 5e6df9c..49865ba 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -17,11 +17,12 @@ */ package org.apache.beam.examples.common; -import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.base.MoreObjects.firstNonNull; import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; @@ -53,22 +54,12 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone @Override public PDone expand(PCollection<String> input) { - // filenamePrefix may contain a directory and a filename component. Pull out only the filename - // component from that path for the PerWindowFiles. - String prefix = ""; ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); - if (!resource.isDirectory()) { - prefix = verifyNotNull( - resource.getFilename(), - "A non-directory resource should have a non-null filename: %s", - resource); - } - - - TextIO.Write write = TextIO.write() - .to(resource.getCurrentDirectory()) - .withFilenamePolicy(new PerWindowFiles(prefix)) - .withWindowedWrites(); + TextIO.Write write = + TextIO.write() + .to(new PerWindowFiles(resource)) + .withTempDirectory(resource.getCurrentDirectory()) + .withWindowedWrites(); if (numShards != null) { write = write.withNumShards(numShards); } @@ -83,31 +74,36 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone */ public static class PerWindowFiles extends FilenamePolicy { - private final String prefix; + private final ResourceId baseFilename; - public PerWindowFiles(String prefix) { - this.prefix = prefix; + public PerWindowFiles(ResourceId baseFilename) { + this.baseFilename = baseFilename; } public String filenamePrefixForWindow(IntervalWindow window) { + String prefix = + baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); return String.format("%s-%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); } @Override - public ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext context, String extension) { + public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = String.format( - "%s-%s-of-%s%s", - filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), - extension); - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + String filename = + String.format( + "%s-%s-of-%s%s", + filenamePrefixForWindow(window), + context.getShardNumber(), + context.getNumShards(), + outputFileHints.getSuggestedFilenameSuffix()); + return baseFilename + .getCurrentDirectory() + .resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename( - ResourceId outputDirectory, Context context, String extension) { + public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index eb7e4c4..bec7952 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -149,7 +150,8 @@ public class WindowedWordCountIT { String outputPrefix = options.getOutput(); - PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix); + PerWindowFiles filenamePolicy = + new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6); http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java index e6c8ddb..1d60198 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -18,7 +18,6 @@ package org.apache.beam.examples.complete.game.utils; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Verify.verifyNotNull; import java.io.Serializable; import java.util.ArrayList; @@ -28,6 +27,7 @@ import java.util.TimeZone; import java.util.stream.Collectors; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; @@ -111,21 +111,12 @@ public class WriteToText<InputT> checkArgument( input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder()); - // filenamePrefix may contain a directory and a filename component. Pull out only the filename - // component from that path for the PerWindowFiles. - String prefix = ""; ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); - if (!resource.isDirectory()) { - prefix = verifyNotNull( - resource.getFilename(), - "A non-directory resource should have a non-null filename: %s", - resource); - } return input.apply( TextIO.write() - .to(resource.getCurrentDirectory()) - .withFilenamePolicy(new PerWindowFiles(prefix)) + .to(new PerWindowFiles(resource)) + .withTempDirectory(resource.getCurrentDirectory()) .withWindowedWrites() .withNumShards(3)); } @@ -139,31 +130,33 @@ public class WriteToText<InputT> */ protected static class PerWindowFiles extends FilenamePolicy { - private final String prefix; + private final ResourceId prefix; - public PerWindowFiles(String prefix) { + public PerWindowFiles(ResourceId prefix) { this.prefix = prefix; } public String filenamePrefixForWindow(IntervalWindow window) { - return String.format("%s-%s-%s", - prefix, formatter.print(window.start()), formatter.print(window.end())); + String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); + return String.format( + "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); } @Override - public ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext context, String extension) { + public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = String.format( - "%s-%s-of-%s%s", - filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), - extension); - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + String filename = + String.format( + "%s-%s-of-%s%s", + filenamePrefixForWindow(window), + context.getShardNumber(), + context.getNumShards(), + outputFileHints.getSuggestedFilenameSuffix()); + return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename( - ResourceId outputDirectory, Context context, String extension) { + public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index 99b77ef..b1d2da4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -26,6 +26,7 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.io.IOException; +import java.io.Serializable; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; @@ -37,6 +38,7 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -51,32 +53,45 @@ public class WriteFilesTranslation { public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN = "urn:beam:file_based_sink:javasdk:0.1"; + public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN = + "urn:beam:file_based_sink_format_function:javasdk:0.1"; + @VisibleForTesting - static WriteFilesPayload toProto(WriteFiles<?> transform) { + static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) { return WriteFilesPayload.newBuilder() .setSink(toProto(transform.getSink())) + .setFormatFunction(toProto(transform.getFormatFunction())) .setWindowedWrites(transform.isWindowedWrites()) .setRunnerDeterminedSharding( transform.getNumShards() == null && transform.getSharding() == null) .build(); } - private static SdkFunctionSpec toProto(FileBasedSink<?> sink) { + private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) { + return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink); + } + + private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) { + return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction); + } + + private static SdkFunctionSpec toProto(String urn, Serializable serializable) { return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() - .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN) + .setUrn(urn) .setParameter( Any.pack( BytesValue.newBuilder() .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink))) + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(serializable))) .build()))) .build(); } @VisibleForTesting - static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { + static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { checkArgument( sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN), "Cannot extract %s instance from %s with URN %s", @@ -87,16 +102,44 @@ public class WriteFilesTranslation { byte[] serializedSink = sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - return (FileBasedSink<?>) + return (FileBasedSink<?, ?>) SerializableUtils.deserializeFromByteArray( serializedSink, FileBasedSink.class.getSimpleName()); } - public static <T> FileBasedSink<T> getSink( - AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> + @VisibleForTesting + static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto( + SdkFunctionSpec sinkProto) throws IOException { + checkArgument( + sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN), + "Cannot extract %s instance from %s with URN %s", + SerializableFunction.class.getSimpleName(), + FunctionSpec.class.getSimpleName(), + sinkProto.getSpec().getUrn()); + + byte[] serializedFunction = + sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + + return (SerializableFunction<InputT, OutputT>) + SerializableUtils.deserializeFromByteArray( + serializedFunction, FileBasedSink.class.getSimpleName()); + } + + public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink( + AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>> + transform) + throws IOException { + return (FileBasedSink<OutputT, DestinationT>) + sinkFromProto(getWriteFilesPayload(transform).getSink()); + } + + public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction( + AppliedPTransform< + PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>> transform) throws IOException { - return (FileBasedSink<T>) sinkFromProto(getWriteFilesPayload(transform).getSink()); + return formatFunctionFromProto( + getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction()); } public static <T> boolean isWindowedWrites( @@ -124,15 +167,15 @@ public class WriteFilesTranslation { .unpack(WriteFilesPayload.class); } - static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?>> { + static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> { @Override - public String getUrn(WriteFiles<?> transform) { + public String getUrn(WriteFiles<?, ?, ?> transform) { return PTransformTranslation.WRITE_FILES_TRANSFORM_URN; } @Override public FunctionSpec translate( - AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents components) { + AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) { return FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) .setParameter(Any.pack(toProto(transform.getTransform()))) http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 6459849..99d3dd1 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.LocalResources; @@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; @@ -537,30 +539,32 @@ public class PTransformMatchersTest implements Serializable { public void writeWithRunnerDeterminedSharding() { ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */); FilenamePolicy policy = - DefaultFilenamePolicy.constructUsingStandardParameters( + DefaultFilenamePolicy.fromStandardParameters( StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, "", false); - WriteFiles<Integer> write = + WriteFiles<Integer, Void, Integer> write = WriteFiles.to( - new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) { + new FileBasedSink<Integer, Void>( + StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { @Override - public WriteOperation<Integer> createWriteOperation() { + public WriteOperation<Integer, Void> createWriteOperation() { return null; } - }); + }, + SerializableFunctions.<Integer>identity()); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)), is(true)); - WriteFiles<Integer> withStaticSharding = write.withNumShards(3); + WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding() .matches(appliedWrite(withStaticSharding)), is(false)); - WriteFiles<Integer> withCustomSharding = + WriteFiles<Integer, Void, Integer> withCustomSharding = write.withSharding(Sum.integersGlobally().asSingletonView()); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding() @@ -568,8 +572,8 @@ public class PTransformMatchersTest implements Serializable { is(false)); } - private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) { - return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of( + private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) { + return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer, Void, Integer>>of( "WriteFiles", Collections.<TupleTag<?>, PValue>emptyMap(), Collections.<TupleTag<?>, PValue>emptyMap(), http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 739034c..283df16 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -26,8 +26,10 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResourceId; @@ -36,6 +38,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.junit.Test; @@ -56,16 +60,17 @@ public class WriteFilesTranslationTest { @RunWith(Parameterized.class) public static class TestWriteFilesPayloadTranslation { @Parameters(name = "{index}: {0}") - public static Iterable<WriteFiles<?>> data() { - return ImmutableList.<WriteFiles<?>>of( - WriteFiles.to(new DummySink()), - WriteFiles.to(new DummySink()).withWindowedWrites(), - WriteFiles.to(new DummySink()).withNumShards(17), - WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); + public static Iterable<WriteFiles<Object, Void, Object>> data() { + SerializableFunction<Object, Object> format = SerializableFunctions.constant(null); + return ImmutableList.of( + WriteFiles.to(new DummySink(), format), + WriteFiles.to(new DummySink(), format).withWindowedWrites(), + WriteFiles.to(new DummySink(), format).withNumShards(17), + WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42)); } @Parameter(0) - public WriteFiles<String> writeFiles; + public WriteFiles<String, Void, String> writeFiles; public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @@ -80,7 +85,7 @@ public class WriteFilesTranslationTest { assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites())); assertThat( - (FileBasedSink<String>) WriteFilesTranslation.sinkFromProto(payload.getSink()), + (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()), equalTo(writeFiles.getSink())); } @@ -89,9 +94,9 @@ public class WriteFilesTranslationTest { PCollection<String> input = p.apply(Create.of("hello")); PDone output = input.apply(writeFiles); - AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> appliedPTransform = - AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of( - "foo", input.expand(), output.expand(), writeFiles, p); + AppliedPTransform<PCollection<String>, PDone, WriteFiles<String, Void, String>> + appliedPTransform = + AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p); assertThat( WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform), @@ -101,7 +106,9 @@ public class WriteFilesTranslationTest { WriteFilesTranslation.isWindowedWrites(appliedPTransform), equalTo(writeFiles.isWindowedWrites())); - assertThat(WriteFilesTranslation.getSink(appliedPTransform), equalTo(writeFiles.getSink())); + assertThat( + WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform), + equalTo(writeFiles.getSink())); } } @@ -109,16 +116,16 @@ public class WriteFilesTranslationTest { * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid * any issues serializing mocks. */ - private static class DummySink extends FileBasedSink<String> { + private static class DummySink extends FileBasedSink<Object, Void> { DummySink() { super( StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)), - new DummyFilenamePolicy()); + DynamicFileDestinations.constant(new DummyFilenamePolicy())); } @Override - public WriteOperation<String> createWriteOperation() { + public WriteOperation<Object, Void> createWriteOperation() { return new DummyWriteOperation(this); } @@ -130,46 +137,39 @@ public class WriteFilesTranslationTest { DummySink that = (DummySink) other; - return getFilenamePolicy().equals(((DummySink) other).getFilenamePolicy()) - && getBaseOutputDirectoryProvider().isAccessible() - && that.getBaseOutputDirectoryProvider().isAccessible() - && getBaseOutputDirectoryProvider() - .get() - .equals(that.getBaseOutputDirectoryProvider().get()); + return getTempDirectoryProvider().isAccessible() + && that.getTempDirectoryProvider().isAccessible() + && getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get()); } @Override public int hashCode() { return Objects.hash( DummySink.class, - getFilenamePolicy(), - getBaseOutputDirectoryProvider().isAccessible() - ? getBaseOutputDirectoryProvider().get() - : null); + getTempDirectoryProvider().isAccessible() ? getTempDirectoryProvider().get() : null); } } - private static class DummyWriteOperation extends FileBasedSink.WriteOperation<String> { - public DummyWriteOperation(FileBasedSink<String> sink) { + private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> { + public DummyWriteOperation(FileBasedSink<Object, Void> sink) { super(sink); } @Override - public FileBasedSink.Writer<String> createWriter() throws Exception { + public FileBasedSink.Writer<Object, Void> createWriter() throws Exception { throw new UnsupportedOperationException("Should never be called."); } } private static class DummyFilenamePolicy extends FilenamePolicy { @Override - public ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext c, String extension) { + public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } @Nullable @Override - public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) { + public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index d8734a1..ba796ae 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -60,9 +60,11 @@ class WriteWithShardingFactory<InputT> public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform( AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> transform) { - try { - WriteFiles<InputT> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform)); + WriteFiles<InputT, ?, ?> replacement = + WriteFiles.to( + WriteFilesTranslation.getSink(transform), + WriteFilesTranslation.getFormatFunction(transform)); if (WriteFilesTranslation.isWindowedWrites(transform)) { replacement = replacement.withWindowedWrites(); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 41d671f..546a181 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -39,9 +39,8 @@ import java.util.UUID; import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.LocalResources; import org.apache.beam.sdk.io.TextIO; @@ -55,6 +54,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -137,21 +137,17 @@ public class WriteWithShardingFactoryTest implements Serializable { @Test public void withNoShardingSpecifiedReturnsNewTransform() { ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */); - FilenamePolicy policy = - DefaultFilenamePolicy.constructUsingStandardParameters( - StaticValueProvider.of(outputDirectory), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, - "", - false); PTransform<PCollection<Object>, PDone> original = WriteFiles.to( - new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) { + new FileBasedSink<Object, Void>( + StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { @Override - public WriteOperation<Object> createWriteOperation() { + public WriteOperation<Object, Void> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } - }); + }, + SerializableFunctions.identity()); @SuppressWarnings("unchecked") PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5d9f0f3..8935759 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1455,8 +1455,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @VisibleForTesting - static class StreamingShardedWriteFactory<T> - implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> { + static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT> + implements PTransformOverrideFactory< + PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> { // We pick 10 as a a default, as it works well with the default number of workers started // by Dataflow. static final int DEFAULT_NUM_SHARDS = 10; @@ -1467,8 +1468,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @Override - public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) { + public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform( + AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> + transform) { // By default, if numShards is not set WriteFiles will produce one file per bundle. In // streaming, there are large numbers of small bundles, resulting in many tiny files. // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files. @@ -1485,7 +1487,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } try { - WriteFiles<T> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform)); + WriteFiles<UserT, DestinationT, OutputT> replacement = + WriteFiles.<UserT, DestinationT, OutputT>to( + WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform), + WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform)); if (WriteFilesTranslation.isWindowedWrites(transform)) { replacement = replacement.withWindowedWrites(); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index bc1a042..94985f8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -76,6 +76,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; @@ -100,6 +101,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.GcsUtil; @@ -1263,30 +1265,39 @@ public class DataflowRunnerTest implements Serializable { private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) { TestPipeline p = TestPipeline.fromOptions(options); - StreamingShardedWriteFactory<Object> factory = + StreamingShardedWriteFactory<Object, Void, Object> factory = new StreamingShardedWriteFactory<>(p.getOptions()); - WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString())); + WriteFiles<Object, Void, Object> original = + WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity()); PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); - AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication = - AppliedPTransform.of( - "writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p); - - WriteFiles<Object> replacement = (WriteFiles<Object>) - factory.getReplacementTransform(originalApplication).getTransform(); + AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>> + originalApplication = + AppliedPTransform.of( + "writefiles", + objs.expand(), + Collections.<TupleTag<?>, PValue>emptyMap(), + original, + p); + + WriteFiles<Object, Void, Object> replacement = + (WriteFiles<Object, Void, Object>) + factory.getReplacementTransform(originalApplication).getTransform(); assertThat(replacement, not(equalTo((Object) original))); assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards)); } - private static class TestSink extends FileBasedSink<Object> { + private static class TestSink extends FileBasedSink<Object, Void> { @Override public void validate(PipelineOptions options) {} TestSink(String tmpFolder) { - super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)), - null); + super( + StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)), + DynamicFileDestinations.constant(null)); } + @Override - public WriteOperation<Object> createWriteOperation() { + public WriteOperation<Object, Void> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 64ff98c..246eb81 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -52,7 +52,6 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; - /** * Test {@link SparkRunnerDebugger} with different pipelines. */ @@ -85,17 +84,20 @@ public class SparkRunnerDebuggerTest { .apply(MapElements.via(new WordCount.FormatAsTextFn())) .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt")); - final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n" - + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" - + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" - + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n" - + "_.groupByKey()\n" - + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n" - + "_.mapPartitions(new org.apache.beam.runners.spark" - + ".SparkRunnerDebuggerTest$PlusOne())\n" - + "sparkContext.union(...)\n" - + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" - + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>"; + final String expectedPipeline = + "sparkContext.parallelize(Arrays.asList(...))\n" + + "_.mapPartitions(" + + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" + + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark" + + ".SparkRunnerDebuggerTest$PlusOne())\n" + + "sparkContext.union(...)\n" + + "_.mapPartitions(" + + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" + + "_.<org.apache.beam.sdk.io.TextIO$Write>"; SparkRunnerDebugger.DebugSparkPipelineResult result = (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 24e907a..1f74afb 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -367,9 +367,12 @@ message WriteFilesPayload { // (Required) The SdkFunctionSpec of the FileBasedSink. SdkFunctionSpec sink = 1; - bool windowed_writes = 2; + // (Required) The format function. + SdkFunctionSpec format_function = 2; - bool runner_determined_sharding = 3; + bool windowed_writes = 3; + + bool runner_determined_sharding = 4; } // A coder, the binary format for serialization and deserialization of data in http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java new file mode 100644 index 0000000..a86b198 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.coders; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.values.ShardedKey; + +/** A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. */ +@VisibleForTesting +public class ShardedKeyCoder<KeyT> extends StructuredCoder<ShardedKey<KeyT>> { + public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { + return new ShardedKeyCoder<>(keyCoder); + } + + private final Coder<KeyT> keyCoder; + private final VarIntCoder shardNumberCoder; + + protected ShardedKeyCoder(Coder<KeyT> keyCoder) { + this.keyCoder = keyCoder; + this.shardNumberCoder = VarIntCoder.of(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(keyCoder); + } + + @Override + public void encode(ShardedKey<KeyT> key, OutputStream outStream) + throws IOException { + keyCoder.encode(key.getKey(), outStream); + shardNumberCoder.encode(key.getShardNumber(), outStream); + } + + @Override + public ShardedKey<KeyT> decode(InputStream inStream) + throws IOException { + return ShardedKey.of(keyCoder.decode(inStream), shardNumberCoder.decode(inStream)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + keyCoder.verifyDeterministic(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 4143db2..89cadbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; @@ -35,6 +34,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.io.fs.ResourceId; @@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PBegin; @@ -52,18 +53,19 @@ import org.apache.beam.sdk.values.PDone; /** * {@link PTransform}s for reading and writing Avro files. * - * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, - * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from. - * See {@link FileSystems} for information on supported file systems and filepatterns. + * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using + * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link + * FileSystems} for information on supported file systems and filepatterns. * - * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. - * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes - * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a + * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read + * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a + * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified * schema. * * <p>For example: - * <pre> {@code + * + * <pre>{@code * Pipeline p = ...; * * // A simple Read of a local file (only runs locally): @@ -75,34 +77,33 @@ import org.apache.beam.sdk.values.PDone; * PCollection<GenericRecord> records = * p.apply(AvroIO.readGenericRecords(schema) * .from("gs://my_bucket/path/to/records-*.avro")); - * } </pre> + * }</pre> * * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using - * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default - * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a - * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional - * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a - * sharded way. You can override this default write filename policy using - * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming - * policy. + * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link + * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set + * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link + * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this + * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a + * custom file naming policy. * * <p>By default, all input is put into the global window before writing. If per-window writes are - * desired - for example, when using a streaming runner - - * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be - * preserved. When producing windowed writes, the number of output shards must be set explicitly - * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a - * runner-chosen value, so you may need not set it yourself. A - * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce - * unique filenames. + * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()} + * will cause windowing and triggering to be preserved. When producing windowed writes with a + * streaming runner that supports triggers, the number of output shards must be set explicitly using + * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen + * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set, + * and unique windows and triggers must produce unique filenames. * - * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. - * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} - * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema - * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the - * specified schema. + * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write + * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes + * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a + * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified + * schema. * * <p>For example: - * <pre> {@code + * + * <pre>{@code * // A simple Write to a local file (only runs locally): * PCollection<AvroAutoGenClass> records = ...; * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro")); @@ -113,11 +114,11 @@ import org.apache.beam.sdk.values.PDone; * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema) * .to("gs://my_bucket/path/to/numbers") * .withSuffix(".avro")); - * } </pre> + * }</pre> * - * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the - * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can - * be changed or overridden using {@link AvroIO.Write#withCodec}. + * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link + * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or + * overridden using {@link AvroIO.Write#withCodec}. */ public class AvroIO { /** @@ -258,11 +259,16 @@ public class AvroIO { @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); @Nullable abstract String getShardTemplate(); @Nullable abstract String getFilenameSuffix(); + + @Nullable + abstract ValueProvider<ResourceId> getTempDirectory(); + abstract int getNumShards(); @Nullable abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); abstract boolean getWindowedWrites(); @Nullable abstract FilenamePolicy getFilenamePolicy(); + /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -277,6 +283,9 @@ public class AvroIO { abstract static class Builder<T> { abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); abstract Builder<T> setFilenameSuffix(String filenameSuffix); + + abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); + abstract Builder<T> setNumShards(int numShards); abstract Builder<T> setShardTemplate(String shardTemplate); abstract Builder<T> setRecordClass(Class<T> recordClass); @@ -296,9 +305,9 @@ public class AvroIO { * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. * * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the - * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and - * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be - * overridden using {@link #withFilenamePolicy(FilenamePolicy)}. + * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a + * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden + * using {@link #to(FilenamePolicy)}. */ public Write<T> to(String outputPrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); @@ -306,14 +315,21 @@ public class AvroIO { /** * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on - * supported file systems. - * - * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. + * supported file systems. This prefix is used by the {@link DefaultFilenamePolicy} to generate + * filenames. * * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the - * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and - * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be - * overridden using {@link #withFilenamePolicy(FilenamePolicy)}. + * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a + * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden + * using {@link #to(FilenamePolicy)}. + * + * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case + * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. + * Custom filename policies do not automatically see this prefix - you should explicitly pass + * the prefix into your {@link FilenamePolicy} object if you need this. + * + * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to + * infer a directory for temporary files. */ @Experimental(Kind.FILESYSTEM) public Write<T> to(ResourceId outputPrefix) { @@ -342,15 +358,22 @@ public class AvroIO { } /** - * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files. + * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A + * directory for temporary files must be specified using {@link #withTempDirectory}. */ - public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) { + public Write<T> to(FilenamePolicy filenamePolicy) { return toBuilder().setFilenamePolicy(filenamePolicy).build(); } + /** Set the base directory used to generate temporary files. */ + @Experimental(Kind.FILESYSTEM) + public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + return toBuilder().setTempDirectory(tempDirectory).build(); + } + /** * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be - * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. + * used when using one of the default filename-prefix to() overrides. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -360,8 +383,8 @@ public class AvroIO { } /** - * Configures the filename suffix for written files. This option may only be used when - * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. + * Configures the filename suffix for written files. This option may only be used when using one + * of the default filename-prefix to() overrides. * * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -402,9 +425,8 @@ public class AvroIO { /** * Preserves windowing of input elements and writes them to files based on the element's window. * - * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will - * be generated using {@link FilenamePolicy#windowedFilename}. See also - * {@link WriteFiles#withWindowedWrites()}. + * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using + * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. */ public Write<T> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); @@ -435,32 +457,46 @@ public class AvroIO { return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - @Override - public PDone expand(PCollection<T> input) { - checkState(getFilenamePrefix() != null, - "Need to set the filename prefix of an AvroIO.Write transform."); - checkState( - (getFilenamePolicy() == null) - || (getShardTemplate() == null && getFilenameSuffix() == null), - "Cannot set a filename policy and also a filename template or suffix."); - checkState(getSchema() != null, - "Need to set the schema of an AvroIO.Write transform."); - checkState(!getWindowedWrites() || (getFilenamePolicy() != null), - "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); - + DynamicDestinations<T, Void> resolveDynamicDestinations() { FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { - usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); + usedFilenamePolicy = + DefaultFilenamePolicy.fromStandardParameters( + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); + } + return DynamicFileDestinations.constant(usedFilenamePolicy); + } + + @Override + public PDone expand(PCollection<T> input) { + checkArgument( + getFilenamePrefix() != null || getTempDirectory() != null, + "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write " + + "transform."); + if (getFilenamePolicy() != null) { + checkArgument( + getShardTemplate() == null && getFilenameSuffix() == null, + "shardTemplate and filenameSuffix should only be used with the default " + + "filename policy"); } + return expandTyped(input, resolveDynamicDestinations()); + } - WriteFiles<T> write = WriteFiles.to( - new AvroSink<>( - getFilenamePrefix(), - usedFilenamePolicy, - AvroCoder.of(getRecordClass(), getSchema()), - getCodec(), - getMetadata())); + public <DestinationT> PDone expandTyped( + PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { + ValueProvider<ResourceId> tempDirectory = getTempDirectory(); + if (tempDirectory == null) { + tempDirectory = getFilenamePrefix(); + } + WriteFiles<T, DestinationT, T> write = + WriteFiles.to( + new AvroSink<>( + tempDirectory, + dynamicDestinations, + AvroCoder.of(getRecordClass(), getSchema()), + getCodec(), + getMetadata()), + SerializableFunctions.<T>identity()); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -473,31 +509,25 @@ public class AvroIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - checkState( - getFilenamePrefix() != null, - "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix)."); - String outputPrefixString = null; - if (getFilenamePrefix().isAccessible()) { - ResourceId dir = getFilenamePrefix().get(); - outputPrefixString = dir.toString(); - } else { - outputPrefixString = getFilenamePrefix().toString(); + resolveDynamicDestinations().populateDisplayData(builder); + + String tempDirectory = null; + if (getTempDirectory() != null) { + tempDirectory = + getTempDirectory().isAccessible() + ? getTempDirectory().get().toString() + : getTempDirectory().toString(); } builder - .add(DisplayData.item("schema", getRecordClass()) - .withLabel("Record Schema")) - .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString) - .withLabel("Output File Prefix")) - .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) - .withLabel("Output Shard Name Template")) - .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) - .withLabel("Output File Suffix")) - .addIfNotDefault(DisplayData.item("numShards", getNumShards()) - .withLabel("Maximum Output Shards"), - 0) - .addIfNotDefault(DisplayData.item("codec", getCodec().toString()) - .withLabel("Avro Compression Codec"), - DEFAULT_CODEC.toString()); + .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema")) + .addIfNotDefault( + DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) + .addIfNotDefault( + DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"), + DEFAULT_CODEC.toString()) + .addIfNotNull( + DisplayData.item("tempDirectory", tempDirectory) + .withLabel("Directory for temporary files")); builder.include("Metadata", new Metadata()); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index 6c36266..c78870b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -32,39 +32,40 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** A {@link FileBasedSink} for Avro files. */ -class AvroSink<T> extends FileBasedSink<T> { +class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; AvroSink( ValueProvider<ResourceId> outputPrefix, - FilenamePolicy filenamePolicy, + DynamicDestinations<T, DestinationT> dynamicDestinations, AvroCoder<T> coder, SerializableAvroCodecFactory codec, ImmutableMap<String, Object> metadata) { // Avro handle compression internally using the codec. - super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED); + super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED); this.coder = coder; this.codec = codec; this.metadata = metadata; } @Override - public WriteOperation<T> createWriteOperation() { + public WriteOperation<T, DestinationT> createWriteOperation() { return new AvroWriteOperation<>(this, coder, codec, metadata); } /** A {@link WriteOperation WriteOperation} for Avro files. */ - private static class AvroWriteOperation<T> extends WriteOperation<T> { + private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; - private AvroWriteOperation(AvroSink<T> sink, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + private AvroWriteOperation( + AvroSink<T, DestinationT> sink, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(sink); this.coder = coder; this.codec = codec; @@ -72,22 +73,23 @@ class AvroSink<T> extends FileBasedSink<T> { } @Override - public Writer<T> createWriter() throws Exception { + public Writer<T, DestinationT> createWriter() throws Exception { return new AvroWriter<>(this, coder, codec, metadata); } } /** A {@link Writer Writer} for Avro files. */ - private static class AvroWriter<T> extends Writer<T> { + private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> { private final AvroCoder<T> coder; private DataFileWriter<T> dataFileWriter; private SerializableAvroCodecFactory codec; private final ImmutableMap<String, Object> metadata; - public AvroWriter(WriteOperation<T> writeOperation, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + public AvroWriter( + WriteOperation<T, DestinationT> writeOperation, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(writeOperation, MimeTypes.BINARY); this.coder = coder; this.codec = codec; http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index f9e4ac4..7a60e49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -20,25 +20,31 @@ package org.apache.beam.sdk.io; import static com.google.common.base.MoreObjects.firstNonNull; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.text.DecimalFormat; import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed @@ -51,10 +57,7 @@ import org.slf4j.LoggerFactory; * {@code WriteOneFilePerWindow} example pipeline. */ public final class DefaultFilenamePolicy extends FilenamePolicy { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class); - - /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ + /** The default sharding name template. */ public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The default windowed sharding name template used when writing windowed files. @@ -67,75 +70,184 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE; /* - * pattern for both windowed and non-windowed file names + * pattern for both windowed and non-windowed file names. */ private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)"); /** + * Encapsulates constructor parameters to {@link DefaultFilenamePolicy}. + * + * <p>This is used as the {@code DestinationT} argument to allow {@link DefaultFilenamePolicy} + * objects to be dynamically generated. + */ + public static class Params implements Serializable { + private final ValueProvider<ResourceId> baseFilename; + private final String shardTemplate; + private final boolean explicitTemplate; + private final String suffix; + + /** + * Construct a default Params object. The shard template will be set to the default {@link + * #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value. + */ + public Params() { + this.baseFilename = null; + this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE; + this.suffix = ""; + this.explicitTemplate = false; + } + + private Params( + ValueProvider<ResourceId> baseFilename, + String shardTemplate, + String suffix, + boolean explicitTemplate) { + this.baseFilename = baseFilename; + this.shardTemplate = shardTemplate; + this.suffix = suffix; + this.explicitTemplate = explicitTemplate; + } + + /** + * Specify that writes are windowed. This affects the default shard template, changing it to + * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}. + */ + public Params withWindowedWrites() { + String template = this.shardTemplate; + if (!explicitTemplate) { + template = DEFAULT_WINDOWED_SHARD_TEMPLATE; + } + return new Params(baseFilename, template, suffix, explicitTemplate); + } + + /** Sets the base filename. */ + public Params withBaseFilename(ResourceId baseFilename) { + return withBaseFilename(StaticValueProvider.of(baseFilename)); + } + + /** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link ValueProvider}. */ + public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) { + return new Params(baseFilename, shardTemplate, suffix, explicitTemplate); + } + + /** Sets the shard template. */ + public Params withShardTemplate(String shardTemplate) { + return new Params(baseFilename, shardTemplate, suffix, true); + } + + /** Sets the suffix. */ + public Params withSuffix(String suffix) { + return new Params(baseFilename, shardTemplate, suffix, explicitTemplate); + } + } + + /** A Coder for {@link Params}. */ + public static class ParamsCoder extends AtomicCoder<Params> { + private static final ParamsCoder INSTANCE = new ParamsCoder(); + private Coder<String> stringCoder = StringUtf8Coder.of(); + + public static ParamsCoder of() { + return INSTANCE; + } + + @Override + public void encode(Params value, OutputStream outStream) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null value"); + } + stringCoder.encode(value.baseFilename.get().toString(), outStream); + stringCoder.encode(value.shardTemplate, outStream); + stringCoder.encode(value.suffix, outStream); + } + + @Override + public Params decode(InputStream inStream) throws IOException { + ResourceId prefix = + FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream)); + String shardTemplate = stringCoder.decode(inStream); + String suffix = stringCoder.decode(inStream); + return new Params() + .withBaseFilename(prefix) + .withShardTemplate(shardTemplate) + .withSuffix(suffix); + } + } + + private final Params params; + /** * Constructs a new {@link DefaultFilenamePolicy}. * * @see DefaultFilenamePolicy for more information on the arguments to this function. */ @VisibleForTesting - DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) { - this.prefix = prefix; - this.shardTemplate = shardTemplate; - this.suffix = suffix; + DefaultFilenamePolicy(Params params) { + this.params = params; } /** - * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename - * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null - * shard name template and suffix. + * Construct a {@link DefaultFilenamePolicy}. * - * <p>Any filename component of the provided resource will be used as the filename prefix. + * <p>This is a shortcut for: * - * <p>If provided, the shard name template will be used; otherwise - * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and - * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names. + * <pre>{@code + * DefaultFilenamePolicy.fromParams(new Params() + * .withBaseFilename(baseFilename) + * .withShardTemplate(shardTemplate) + * .withSuffix(filenameSuffix) + * .withWindowedWrites()) + * }</pre> * - * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix. + * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true. */ - public static DefaultFilenamePolicy constructUsingStandardParameters( - ValueProvider<ResourceId> outputPrefix, + public static DefaultFilenamePolicy fromStandardParameters( + ValueProvider<ResourceId> baseFilename, @Nullable String shardTemplate, @Nullable String filenameSuffix, boolean windowedWrites) { - // Pick the appropriate default policy based on whether windowed writes are being performed. - String defaultTemplate = - windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE; - return new DefaultFilenamePolicy( - NestedValueProvider.of(outputPrefix, new ExtractFilename()), - firstNonNull(shardTemplate, defaultTemplate), - firstNonNull(filenameSuffix, "")); + Params params = new Params().withBaseFilename(baseFilename); + if (shardTemplate != null) { + params = params.withShardTemplate(shardTemplate); + } + if (filenameSuffix != null) { + params = params.withSuffix(filenameSuffix); + } + if (windowedWrites) { + params = params.withWindowedWrites(); + } + return fromParams(params); } - private final ValueProvider<String> prefix; - private final String shardTemplate; - private final String suffix; + /** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. */ + public static DefaultFilenamePolicy fromParams(Params params) { + return new DefaultFilenamePolicy(params); + } /** * Constructs a fully qualified name from components. * - * <p>The name is built from a prefix, shard template (with shard numbers - * applied), and a suffix. All components are required, but may be empty - * strings. + * <p>The name is built from a base filename, shard template (with shard numbers applied), and a + * suffix. All components are required, but may be empty strings. * - * <p>Within a shard template, repeating sequences of the letters "S" or "N" - * are replaced with the shard number, or number of shards respectively. - * "P" is replaced with by stringification of current pane. - * "W" is replaced by stringification of current window. + * <p>Within a shard template, repeating sequences of the letters "S" or "N" are replaced with the + * shard number, or number of shards respectively. "P" is replaced with by stringification of + * current pane. "W" is replaced by stringification of current window. * - * <p>The numbers are formatted with leading zeros to match the length of the - * repeated sequence of letters. + * <p>The numbers are formatted with leading zeros to match the length of the repeated sequence of + * letters. * - * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and - * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is - * produced: "output-001-of-100.txt". + * <p>For example, if baseFilename = "path/to/output", shardTemplate = "-SSS-of-NNN", and suffix = + * ".txt", with shardNum = 1 and numShards = 100, the following is produced: + * "path/to/output-001-of-100.txt". */ - static String constructName( - String prefix, String shardTemplate, String suffix, int shardNum, int numShards, - String paneStr, String windowStr) { + static ResourceId constructName( + ResourceId baseFilename, + String shardTemplate, + String suffix, + int shardNum, + int numShards, + String paneStr, + String windowStr) { + String prefix = extractFilename(baseFilename); // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); @@ -165,27 +277,37 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { m.appendTail(sb); sb.append(suffix); - return sb.toString(); + return baseFilename + .getCurrentDirectory() + .resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE); } @Override @Nullable - public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, - String extension) { - String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), - context.getNumShards(), null, null) + extension; - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + return constructName( + params.baseFilename.get(), + params.shardTemplate, + params.suffix + outputFileHints.getSuggestedFilenameSuffix(), + context.getShardNumber(), + context.getNumShards(), + null, + null); } @Override - public ResourceId windowedFilename(ResourceId outputDirectory, - WindowedContext context, String extension) { + public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { final PaneInfo paneInfo = context.getPaneInfo(); String paneStr = paneInfoToString(paneInfo); String windowStr = windowToString(context.getWindow()); - String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), - context.getNumShards(), paneStr, windowStr) + extension; - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + return constructName( + params.baseFilename.get(), + params.shardTemplate, + params.suffix + outputFileHints.getSuggestedFilenameSuffix(), + context.getShardNumber(), + context.getNumShards(), + paneStr, + windowStr); } /* @@ -216,24 +338,32 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { @Override public void populateDisplayData(DisplayData.Builder builder) { String filenamePattern; - if (prefix.isAccessible()) { - filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix); + if (params.baseFilename.isAccessible()) { + filenamePattern = + String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix); } else { - filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix); + filenamePattern = + String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix); } + + String outputPrefixString = null; + outputPrefixString = + params.baseFilename.isAccessible() + ? params.baseFilename.get().toString() + : params.baseFilename.toString(); + builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern")); + builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix")); + builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix")); builder.add( - DisplayData.item("filenamePattern", filenamePattern) - .withLabel("Filename Pattern")); + DisplayData.item("shardNameTemplate", params.shardTemplate) + .withLabel("Output Shard Name Template")); } - private static class ExtractFilename implements SerializableFunction<ResourceId, String> { - @Override - public String apply(ResourceId input) { - if (input.isDirectory()) { - return ""; - } else { - return firstNonNull(input.getFilename(), ""); - } + private static String extractFilename(ResourceId input) { + if (input.isDirectory()) { + return ""; + } else { + return firstNonNull(input.getFilename(), ""); } } }
