[BEAM-92] Supports DynamicDestinations in AvroIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2622fa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2622fa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2622fa Branch: refs/heads/master Commit: 9f2622fa19da1284222e872fdcd63b086bdc3509 Parents: 1f2634d Author: Reuven Lax <[email protected]> Authored: Thu Jul 6 20:22:25 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jul 28 17:28:12 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 2 +- .../construction/WriteFilesTranslation.java | 81 ++-- .../construction/PTransformMatchersTest.java | 10 +- .../construction/WriteFilesTranslationTest.java | 26 +- .../direct/WriteWithShardingFactory.java | 10 +- .../direct/WriteWithShardingFactoryTest.java | 8 +- .../beam/runners/dataflow/DataflowRunner.java | 8 +- .../runners/dataflow/DataflowRunnerTest.java | 10 +- .../src/main/proto/beam_runner_api.proto | 2 + .../java/org/apache/beam/sdk/io/AvroIO.java | 436 +++++++++++++++---- .../java/org/apache/beam/sdk/io/AvroSink.java | 93 ++-- .../beam/sdk/io/ConstantAvroDestination.java | 130 ++++++ .../beam/sdk/io/DefaultFilenamePolicy.java | 1 - .../beam/sdk/io/DynamicAvroDestinations.java | 46 ++ .../beam/sdk/io/DynamicFileDestinations.java | 59 ++- .../org/apache/beam/sdk/io/FileBasedSink.java | 121 +++-- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 23 +- .../java/org/apache/beam/sdk/io/TextIO.java | 228 ++++++---- .../java/org/apache/beam/sdk/io/TextSink.java | 14 +- .../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++--- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++- .../apache/beam/sdk/io/FileBasedSinkTest.java | 6 +- .../java/org/apache/beam/sdk/io/SimpleSink.java | 10 +- .../org/apache/beam/sdk/io/TextIOWriteTest.java | 23 +- .../org/apache/beam/sdk/io/WriteFilesTest.java | 74 ++-- .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 8 +- 27 files changed, 1214 insertions(+), 491 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index d7b0e9f..5765c51 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -484,7 +484,7 @@ public class ParDoTranslation { }); } - private static SideInput toProto(PCollectionView<?> view) { + public static SideInput toProto(PCollectionView<?> view) { Builder builder = SideInput.newBuilder(); builder.setAccessPattern( FunctionSpec.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index b1d2da4..7954b0e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -19,29 +19,35 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.io.IOException; import java.io.Serializable; import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TupleTag; /** * Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi} @@ -53,28 +59,25 @@ public class WriteFilesTranslation { public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN = "urn:beam:file_based_sink:javasdk:0.1"; - public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN = - "urn:beam:file_based_sink_format_function:javasdk:0.1"; - @VisibleForTesting static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) { + Map<String, SideInput> sideInputs = Maps.newHashMap(); + for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) { + sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view)); + } return WriteFilesPayload.newBuilder() .setSink(toProto(transform.getSink())) - .setFormatFunction(toProto(transform.getFormatFunction())) .setWindowedWrites(transform.isWindowedWrites()) .setRunnerDeterminedSharding( transform.getNumShards() == null && transform.getSharding() == null) + .putAllSideInputs(sideInputs) .build(); } - private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) { + private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) { return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink); } - private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) { - return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction); - } - private static SdkFunctionSpec toProto(String urn, Serializable serializable) { return SdkFunctionSpec.newBuilder() .setSpec( @@ -91,7 +94,7 @@ public class WriteFilesTranslation { } @VisibleForTesting - static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { + static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { checkArgument( sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN), "Cannot extract %s instance from %s with URN %s", @@ -102,44 +105,44 @@ public class WriteFilesTranslation { byte[] serializedSink = sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - return (FileBasedSink<?, ?>) + return (FileBasedSink<?, ?, ?>) SerializableUtils.deserializeFromByteArray( serializedSink, FileBasedSink.class.getSimpleName()); } - @VisibleForTesting - static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto( - SdkFunctionSpec sinkProto) throws IOException { - checkArgument( - sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN), - "Cannot extract %s instance from %s with URN %s", - SerializableFunction.class.getSimpleName(), - FunctionSpec.class.getSimpleName(), - sinkProto.getSpec().getUrn()); - - byte[] serializedFunction = - sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - - return (SerializableFunction<InputT, OutputT>) - SerializableUtils.deserializeFromByteArray( - serializedFunction, FileBasedSink.class.getSimpleName()); - } - - public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink( + public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink( AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>> transform) throws IOException { - return (FileBasedSink<OutputT, DestinationT>) + return (FileBasedSink<UserT, DestinationT, OutputT>) sinkFromProto(getWriteFilesPayload(transform).getSink()); } - public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction( - AppliedPTransform< - PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>> - transform) - throws IOException { - return formatFunctionFromProto( - getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction()); + public static <UserT, DestinationT, OutputT> + List<PCollectionView<?>> getDynamicDestinationSideInputs( + AppliedPTransform< + PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>> + transform) + throws IOException { + SdkComponents sdkComponents = SdkComponents.create(); + RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents); + List<PCollectionView<?>> views = Lists.newArrayList(); + Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap(); + for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) { + PCollection<?> originalPCollection = + checkNotNull( + (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())), + "no input with tag %s", + entry.getKey()); + views.add( + ParDoTranslation.viewFromProto( + entry.getValue(), + entry.getKey(), + originalPCollection, + transformProto, + RehydratedComponents.forComponents(sdkComponents.toComponents()))); + } + return views; } public static <T> boolean isWindowedWrites( http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 316645b..1862699 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; @@ -549,15 +548,14 @@ public class PTransformMatchersTest implements Serializable { false); WriteFiles<Integer, Void, Integer> write = WriteFiles.to( - new FileBasedSink<Integer, Void>( + new FileBasedSink<Integer, Void, Integer>( StaticValueProvider.of(outputDirectory), - DynamicFileDestinations.constant(new FakeFilenamePolicy())) { + DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) { @Override - public WriteOperation<Integer, Void> createWriteOperation() { + public WriteOperation<Void, Integer> createWriteOperation() { return null; } - }, - SerializableFunctions.<Integer>identity()); + }); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)), is(true)); http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 4259ac8..e067fac 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -63,12 +62,11 @@ public class WriteFilesTranslationTest { public static class TestWriteFilesPayloadTranslation { @Parameters(name = "{index}: {0}") public static Iterable<WriteFiles<Object, Void, Object>> data() { - SerializableFunction<Object, Object> format = SerializableFunctions.constant(null); return ImmutableList.of( - WriteFiles.to(new DummySink(), format), - WriteFiles.to(new DummySink(), format).withWindowedWrites(), - WriteFiles.to(new DummySink(), format).withNumShards(17), - WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42)); + WriteFiles.to(new DummySink()), + WriteFiles.to(new DummySink()).withWindowedWrites(), + WriteFiles.to(new DummySink()).withNumShards(17), + WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); } @Parameter(0) @@ -87,7 +85,8 @@ public class WriteFilesTranslationTest { assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites())); assertThat( - (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()), + (FileBasedSink<String, Void, String>) + WriteFilesTranslation.sinkFromProto(payload.getSink()), equalTo(writeFiles.getSink())); } @@ -118,16 +117,17 @@ public class WriteFilesTranslationTest { * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid * any issues serializing mocks. */ - private static class DummySink extends FileBasedSink<Object, Void> { + private static class DummySink extends FileBasedSink<Object, Void, Object> { DummySink() { super( StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)), - DynamicFileDestinations.constant(new DummyFilenamePolicy())); + DynamicFileDestinations.constant( + new DummyFilenamePolicy(), SerializableFunctions.constant(null))); } @Override - public WriteOperation<Object, Void> createWriteOperation() { + public WriteOperation<Void, Object> createWriteOperation() { return new DummyWriteOperation(this); } @@ -152,13 +152,13 @@ public class WriteFilesTranslationTest { } } - private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> { - public DummyWriteOperation(FileBasedSink<Object, Void> sink) { + private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> { + public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) { super(sink); } @Override - public FileBasedSink.Writer<Object, Void> createWriter() throws Exception { + public FileBasedSink.Writer<Void, Object> createWriter() throws Exception { throw new UnsupportedOperationException("Should never be called."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index ba796ae..3557c5d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -24,10 +24,12 @@ import com.google.common.base.Suppliers; import java.io.IOException; import java.io.Serializable; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.WriteFilesTranslation; +import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -61,10 +63,10 @@ class WriteWithShardingFactory<InputT> AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> transform) { try { - WriteFiles<InputT, ?, ?> replacement = - WriteFiles.to( - WriteFilesTranslation.getSink(transform), - WriteFilesTranslation.getFormatFunction(transform)); + List<PCollectionView<?>> sideInputs = + WriteFilesTranslation.getDynamicDestinationSideInputs(transform); + FileBasedSink sink = WriteFilesTranslation.getSink(transform); + WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs); if (WriteFilesTranslation.isWindowedWrites(transform)) { replacement = replacement.withWindowedWrites(); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 6dd069c..d0db44e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -143,15 +142,14 @@ public class WriteWithShardingFactoryTest implements Serializable { PTransform<PCollection<Object>, PDone> original = WriteFiles.to( - new FileBasedSink<Object, Void>( + new FileBasedSink<Object, Void, Object>( StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(new FakeFilenamePolicy())) { @Override - public WriteOperation<Object, Void> createWriteOperation() { + public WriteOperation<Void, Object> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } - }, - SerializableFunctions.identity()); + }); @SuppressWarnings("unchecked") PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 762ac9f..f8d2c3c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -92,6 +92,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; @@ -1501,10 +1502,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } try { + List<PCollectionView<?>> sideInputs = + WriteFilesTranslation.getDynamicDestinationSideInputs(transform); + FileBasedSink sink = WriteFilesTranslation.getSink(transform); WriteFiles<UserT, DestinationT, OutputT> replacement = - WriteFiles.<UserT, DestinationT, OutputT>to( - WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform), - WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform)); + WriteFiles.to(sink).withSideInputs(sideInputs); if (WriteFilesTranslation.isWindowedWrites(transform)) { replacement = replacement.withWindowedWrites(); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 7556a28..9db73c6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1271,8 +1271,7 @@ public class DataflowRunnerTest implements Serializable { StreamingShardedWriteFactory<Object, Void, Object> factory = new StreamingShardedWriteFactory<>(p.getOptions()); - WriteFiles<Object, Void, Object> original = - WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity()); + WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString())); PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>> originalApplication = @@ -1290,7 +1289,7 @@ public class DataflowRunnerTest implements Serializable { assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards)); } - private static class TestSink extends FileBasedSink<Object, Void> { + private static class TestSink extends FileBasedSink<Object, Void, Object> { @Override public void validate(PipelineOptions options) {} @@ -1315,11 +1314,12 @@ public class DataflowRunnerTest implements Serializable { int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("should not be called"); } - })); + }, + SerializableFunctions.identity())); } @Override - public WriteOperation<Object, Void> createWriteOperation() { + public WriteOperation<Void, Object> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 42e2601..9afb565 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -375,6 +375,8 @@ message WriteFilesPayload { bool windowed_writes = 3; bool runner_determined_sharding = 4; + + map<string, SideInput> side_inputs = 5; } // A coder, the binary format for serialization and deserialization of data in http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 27c9073..824f725 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.io.BaseEncoding; import java.util.Map; import javax.annotation.Nullable; import org.apache.avro.Schema; @@ -40,7 +39,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; @@ -51,7 +49,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -161,6 +158,51 @@ import org.apache.beam.sdk.values.TypeDescriptors; * .withSuffix(".avro")); * }</pre> * + * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file + * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user + * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id + * as an integer field. We want events for each user to go into a specific directory for that user, + * and each user's data should be written with a specific schema for that user; a side input is + * used, so the schema can be calculated in a different stage. + * + * <pre>{@code + * // This is the user class that controls dynamic destinations for this avro write. The input to + * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order + * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type + * // of Integer. + * class UserDynamicAvroDestinations + * extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> { + * private final PCollectionView<Map<Integer, String>> userToSchemaMap; + * public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) { + * this.userToSchemaMap = userToSchemaMap; + * } + * public GenericRecord formatRecord(UserEvent record) { + * return formatUserRecord(record, getSchema(record.getUserId())); + * } + * public Schema getSchema(Integer userId) { + * return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId)); + * } + * public Integer getDestination(UserEvent record) { + * return record.getUserId(); + * } + * public Integer getDefaultDestination() { + * return 0; + * } + * public FilenamePolicy getFilenamePolicy(Integer userId) { + * return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-" + * + userId + "/events")); + * } + * public List<PCollectionView<?>> getSideInputs() { + * return ImmutableList.<PCollectionView<?>>of(userToSchemaMap); + * } + * } + * PCollection<UserEvents> events = ...; + * PCollectionView<Integer, String> schemaMap = events.apply( + * "ComputeSchemas", new ComputePerUserSchemas()); + * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords() + * .to(new UserDynamicAvros())); + * }</pre> + * * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or * overridden using {@link AvroIO.Write#withCodec}. @@ -256,18 +298,53 @@ public class AvroIO { * pattern). */ public static <T> Write<T> write(Class<T> recordClass) { - return AvroIO.<T>defaultWriteBuilder() - .setRecordClass(recordClass) - .setSchema(ReflectData.get().getSchema(recordClass)) - .build(); + return new Write<>( + AvroIO.<T, T>defaultWriteBuilder() + .setGenericRecords(false) + .setSchema(ReflectData.get().getSchema(recordClass)) + .build()); } /** Writes Avro records of the specified schema. */ public static Write<GenericRecord> writeGenericRecords(Schema schema) { - return AvroIO.<GenericRecord>defaultWriteBuilder() - .setRecordClass(GenericRecord.class) - .setSchema(schema) - .build(); + return new Write<>( + AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder() + .setGenericRecords(true) + .setSchema(schema) + .build()); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files + * matching a sharding pattern), with each element of the input collection encoded into its own + * record of type OutputT. + * + * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type + * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type + * that will be written to the file must be specified. If using a custom {@link + * DynamicAvroDestinations} object this is done using {@link + * DynamicAvroDestinations#formatRecord}, otherwise the {@link + * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function. + * + * <p>The advantage of using a custom type is that is it allows a user-provided {@link + * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to + * examine the custom type when choosing a destination. + * + * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()} + * instead. + */ + public static <UserT, OutputT> TypedWrite<UserT, OutputT> writeCustomType() { + return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build(); + } + + /** + * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is + * {@link GenericRecord}. A schema must be specified either in {@link + * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link + * TypedWrite#withSchema(Schema)}. + */ + public static <UserT> TypedWrite<UserT, GenericRecord> writeCustomTypeToGenericRecords() { + return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build(); } /** @@ -277,12 +354,12 @@ public class AvroIO { return writeGenericRecords(new Schema.Parser().parse(schema)); } - private static <T> Write.Builder<T> defaultWriteBuilder() { - return new AutoValue_AvroIO_Write.Builder<T>() + private static <UserT, OutputT> TypedWrite.Builder<UserT, OutputT> defaultWriteBuilder() { + return new AutoValue_AvroIO_TypedWrite.Builder<UserT, OutputT>() .setFilenameSuffix(null) .setShardTemplate(null) .setNumShards(0) - .setCodec(Write.DEFAULT_CODEC) + .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) .setMetadata(ImmutableMap.<String, Object>of()) .setWindowedWrites(false); } @@ -572,15 +649,18 @@ public class AvroIO { } } - ///////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ @AutoValue - public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { - private static final SerializableAvroCodecFactory DEFAULT_CODEC = - new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); - // This should be a multiple of 4 to not get a partial encoded byte. - private static final int METADATA_BYTES_MAX_LENGTH = 40; + public abstract static class TypedWrite<UserT, OutputT> + extends PTransform<PCollection<UserT>, PDone> { + static final CodecFactory DEFAULT_CODEC = CodecFactory.deflateCodec(6); + static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC = + new SerializableAvroCodecFactory(DEFAULT_CODEC); + + @Nullable + abstract SerializableFunction<UserT, OutputT> getFormatFunction(); @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); @Nullable abstract String getShardTemplate(); @@ -590,11 +670,16 @@ public class AvroIO { abstract ValueProvider<ResourceId> getTempDirectory(); abstract int getNumShards(); - @Nullable abstract Class<T> getRecordClass(); + + abstract boolean getGenericRecords(); + @Nullable abstract Schema getSchema(); abstract boolean getWindowedWrites(); @Nullable abstract FilenamePolicy getFilenamePolicy(); + @Nullable + abstract DynamicAvroDestinations<UserT, ?, OutputT> getDynamicDestinations(); + /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -603,25 +688,39 @@ public class AvroIO { /** Avro file metadata. */ abstract ImmutableMap<String, Object> getMetadata(); - abstract Builder<T> toBuilder(); + abstract Builder<UserT, OutputT> toBuilder(); @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); - abstract Builder<T> setFilenameSuffix(String filenameSuffix); + abstract static class Builder<UserT, OutputT> { + abstract Builder<UserT, OutputT> setFormatFunction( + SerializableFunction<UserT, OutputT> formatFunction); - abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory); + abstract Builder<UserT, OutputT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); - abstract Builder<T> setNumShards(int numShards); - abstract Builder<T> setShardTemplate(String shardTemplate); - abstract Builder<T> setRecordClass(Class<T> recordClass); - abstract Builder<T> setSchema(Schema schema); - abstract Builder<T> setWindowedWrites(boolean windowedWrites); - abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy); - abstract Builder<T> setCodec(SerializableAvroCodecFactory codec); - abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata); + abstract Builder<UserT, OutputT> setFilenameSuffix(String filenameSuffix); + + abstract Builder<UserT, OutputT> setTempDirectory(ValueProvider<ResourceId> tempDirectory); + + abstract Builder<UserT, OutputT> setNumShards(int numShards); + + abstract Builder<UserT, OutputT> setShardTemplate(String shardTemplate); + + abstract Builder<UserT, OutputT> setGenericRecords(boolean genericRecords); - abstract Write<T> build(); + abstract Builder<UserT, OutputT> setSchema(Schema schema); + + abstract Builder<UserT, OutputT> setWindowedWrites(boolean windowedWrites); + + abstract Builder<UserT, OutputT> setFilenamePolicy(FilenamePolicy filenamePolicy); + + abstract Builder<UserT, OutputT> setCodec(SerializableAvroCodecFactory codec); + + abstract Builder<UserT, OutputT> setMetadata(ImmutableMap<String, Object> metadata); + + abstract Builder<UserT, OutputT> setDynamicDestinations( + DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations); + + abstract TypedWrite<UserT, OutputT> build(); } /** @@ -635,7 +734,7 @@ public class AvroIO { * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden * using {@link #to(FilenamePolicy)}. */ - public Write<T> to(String outputPrefix) { + public TypedWrite<UserT, OutputT> to(String outputPrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); } @@ -658,14 +757,12 @@ public class AvroIO { * infer a directory for temporary files. */ @Experimental(Kind.FILESYSTEM) - public Write<T> to(ResourceId outputPrefix) { + public TypedWrite<UserT, OutputT> to(ResourceId outputPrefix) { return toResource(StaticValueProvider.of(outputPrefix)); } - /** - * Like {@link #to(String)}. - */ - public Write<T> to(ValueProvider<String> outputPrefix) { + /** Like {@link #to(String)}. */ + public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) { return toResource(NestedValueProvider.of(outputPrefix, new SerializableFunction<String, ResourceId>() { @Override @@ -675,11 +772,9 @@ public class AvroIO { })); } - /** - * Like {@link #to(ResourceId)}. - */ + /** Like {@link #to(ResourceId)}. */ @Experimental(Kind.FILESYSTEM) - public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) { + public TypedWrite<UserT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) { return toBuilder().setFilenamePrefix(outputPrefix).build(); } @@ -687,16 +782,52 @@ public class AvroIO { * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A * directory for temporary files must be specified using {@link #withTempDirectory}. */ - public Write<T> to(FilenamePolicy filenamePolicy) { + @Experimental(Kind.FILESYSTEM) + public TypedWrite<UserT, OutputT> to(FilenamePolicy filenamePolicy) { return toBuilder().setFilenamePolicy(filenamePolicy).build(); } + /** + * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These + * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for + * temporary files must be specified using {@link #withTempDirectory}. + */ + @Experimental(Kind.FILESYSTEM) + public TypedWrite<UserT, OutputT> to( + DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations) { + return toBuilder().setDynamicDestinations(dynamicDestinations).build(); + } + + /** + * Sets the the output schema. Can only be used when the output type is {@link GenericRecord} + * and when not using {@link #to(DynamicAvroDestinations)}. + */ + public TypedWrite<UserT, OutputT> withSchema(Schema schema) { + return toBuilder().setSchema(schema).build(); + } + + /** + * Specifies a format function to convert {@link UserT} to the output type. If {@link + * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be + * used instead. + */ + public TypedWrite<UserT, OutputT> withFormatFunction( + SerializableFunction<UserT, OutputT> formatFunction) { + return toBuilder().setFormatFunction(formatFunction).build(); + } + /** Set the base directory used to generate temporary files. */ @Experimental(Kind.FILESYSTEM) - public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + public TypedWrite<UserT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { return toBuilder().setTempDirectory(tempDirectory).build(); } + /** Set the base directory used to generate temporary files. */ + @Experimental(Kind.FILESYSTEM) + public TypedWrite<UserT, OutputT> withTempDirectory(ResourceId tempDirectory) { + return withTempDirectory(StaticValueProvider.of(tempDirectory)); + } + /** * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be * used when using one of the default filename-prefix to() overrides. @@ -704,7 +835,7 @@ public class AvroIO { * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public Write<T> withShardNameTemplate(String shardTemplate) { + public TypedWrite<UserT, OutputT> withShardNameTemplate(String shardTemplate) { return toBuilder().setShardTemplate(shardTemplate).build(); } @@ -715,7 +846,7 @@ public class AvroIO { * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. */ - public Write<T> withSuffix(String filenameSuffix) { + public TypedWrite<UserT, OutputT> withSuffix(String filenameSuffix) { return toBuilder().setFilenameSuffix(filenameSuffix).build(); } @@ -729,7 +860,7 @@ public class AvroIO { * * @param numShards the number of shards to use, or 0 to let the system decide. */ - public Write<T> withNumShards(int numShards) { + public TypedWrite<UserT, OutputT> withNumShards(int numShards) { checkArgument(numShards >= 0); return toBuilder().setNumShards(numShards).build(); } @@ -744,7 +875,7 @@ public class AvroIO { * * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ - public Write<T> withoutSharding() { + public TypedWrite<UserT, OutputT> withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } @@ -754,12 +885,12 @@ public class AvroIO { * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. */ - public Write<T> withWindowedWrites() { + public TypedWrite<UserT, OutputT> withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } /** Writes to Avro file(s) compressed using specified codec. */ - public Write<T> withCodec(CodecFactory codec) { + public TypedWrite<UserT, OutputT> withCodec(CodecFactory codec) { return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build(); } @@ -768,7 +899,7 @@ public class AvroIO { * * <p>Supported value types are String, Long, and byte[]. */ - public Write<T> withMetadata(Map<String, Object> metadata) { + public TypedWrite<UserT, OutputT> withMetadata(Map<String, Object> metadata) { Map<String, String> badKeys = Maps.newLinkedHashMap(); for (Map.Entry<String, Object> entry : metadata.entrySet()) { Object v = entry.getValue(); @@ -783,18 +914,31 @@ public class AvroIO { return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } - DynamicDestinations<T, Void> resolveDynamicDestinations() { - FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); - if (usedFilenamePolicy == null) { - usedFilenamePolicy = - DefaultFilenamePolicy.fromStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); + DynamicAvroDestinations<UserT, ?, OutputT> resolveDynamicDestinations() { + DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations = getDynamicDestinations(); + if (dynamicDestinations == null) { + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = + DefaultFilenamePolicy.fromStandardParameters( + getFilenamePrefix(), + getShardTemplate(), + getFilenameSuffix(), + getWindowedWrites()); + } + dynamicDestinations = + constantDestinations( + usedFilenamePolicy, + getSchema(), + getMetadata(), + getCodec().getCodec(), + getFormatFunction()); } - return DynamicFileDestinations.constant(usedFilenamePolicy); + return dynamicDestinations; } @Override - public PDone expand(PCollection<T> input) { + public PDone expand(PCollection<UserT> input) { checkArgument( getFilenamePrefix() != null || getTempDirectory() != null, "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write " @@ -805,24 +949,25 @@ public class AvroIO { "shardTemplate and filenameSuffix should only be used with the default " + "filename policy"); } + if (getDynamicDestinations() != null) { + checkArgument( + getFormatFunction() == null, + "A format function should not be specified " + + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead"); + } + return expandTyped(input, resolveDynamicDestinations()); } public <DestinationT> PDone expandTyped( - PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { + PCollection<UserT> input, + DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations) { ValueProvider<ResourceId> tempDirectory = getTempDirectory(); if (tempDirectory == null) { tempDirectory = getFilenamePrefix(); } - WriteFiles<T, DestinationT, T> write = - WriteFiles.to( - new AvroSink<>( - tempDirectory, - dynamicDestinations, - AvroCoder.of(getRecordClass(), getSchema()), - getCodec(), - getMetadata()), - SerializableFunctions.<T>identity()); + WriteFiles<UserT, DestinationT, OutputT> write = + WriteFiles.to(new AvroSink<>(tempDirectory, dynamicDestinations, getGenericRecords())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -845,33 +990,11 @@ public class AvroIO { : getTempDirectory().toString(); } builder - .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema")) .addIfNotDefault( DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) - .addIfNotDefault( - DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"), - DEFAULT_CODEC.toString()) .addIfNotNull( DisplayData.item("tempDirectory", tempDirectory) .withLabel("Directory for temporary files")); - builder.include("Metadata", new Metadata()); - } - - private class Metadata implements HasDisplayData { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - for (Map.Entry<String, Object> entry : getMetadata().entrySet()) { - DisplayData.Type type = DisplayData.inferType(entry.getValue()); - if (type != null) { - builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); - } else { - String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue()); - String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH - ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "..."; - builder.add(DisplayData.item(entry.getKey(), repr)); - } - } - } } @Override @@ -880,6 +1003,131 @@ public class AvroIO { } } + /** + * This class is used as the default return value of {@link AvroIO#write} + * + * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}. + * This class exists for backwards compatibility, and will be removed in Beam 3.0. + */ + public static class Write<T> extends PTransform<PCollection<T>, PDone> { + @VisibleForTesting TypedWrite<T, T> inner; + + Write(TypedWrite<T, T> inner) { + this.inner = inner; + } + + /** See {@link TypedWrite#to(String)}. */ + public Write<T> to(String outputPrefix) { + return new Write<>( + inner + .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)) + .withFormatFunction(SerializableFunctions.<T>identity())); + } + + /** See {@link TypedWrite#to(ResourceId)} . */ + @Experimental(Kind.FILESYSTEM) + public Write<T> to(ResourceId outputPrefix) { + return new Write<T>( + inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity())); + } + + /** See {@link TypedWrite#to(ValueProvider)}. */ + public Write<T> to(ValueProvider<String> outputPrefix) { + return new Write<>( + inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity())); + } + + /** See {@link TypedWrite#to(ResourceId)}. */ + @Experimental(Kind.FILESYSTEM) + public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) { + return new Write<>( + inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity())); + } + + /** See {@link TypedWrite#to(FilenamePolicy)}. */ + public Write<T> to(FilenamePolicy filenamePolicy) { + return new Write<>( + inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity())); + } + + /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */ + public Write to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) { + return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null)); + } + + /** See {@link TypedWrite#withSchema}. */ + public Write withSchema(Schema schema) { + return new Write<>(inner.withSchema(schema)); + } + /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ + @Experimental(Kind.FILESYSTEM) + public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { + return new Write<>(inner.withTempDirectory(tempDirectory)); + } + + /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ + public Write<T> withTempDirectory(ResourceId tempDirectory) { + return new Write<>(inner.withTempDirectory(tempDirectory)); + } + + /** See {@link TypedWrite#withShardNameTemplate}. */ + public Write<T> withShardNameTemplate(String shardTemplate) { + return new Write<>(inner.withShardNameTemplate(shardTemplate)); + } + + /** See {@link TypedWrite#withSuffix}. */ + public Write<T> withSuffix(String filenameSuffix) { + return new Write<>(inner.withSuffix(filenameSuffix)); + } + + /** See {@link TypedWrite#withNumShards}. */ + public Write<T> withNumShards(int numShards) { + return new Write<>(inner.withNumShards(numShards)); + } + + /** See {@link TypedWrite#withoutSharding}. */ + public Write<T> withoutSharding() { + return new Write<>(inner.withoutSharding()); + } + + /** See {@link TypedWrite#withWindowedWrites}. */ + public Write withWindowedWrites() { + return new Write<T>(inner.withWindowedWrites()); + } + + /** See {@link TypedWrite#withCodec}. */ + public Write<T> withCodec(CodecFactory codec) { + return new Write<>(inner.withCodec(codec)); + } + + /** See {@link TypedWrite#withMetadata} . */ + public Write withMetadata(Map<String, Object> metadata) { + return new Write<>(inner.withMetadata(metadata)); + } + + @Override + public PDone expand(PCollection<T> input) { + return inner.expand(input); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + inner.populateDisplayData(builder); + } + } + + /** + * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy}, + * schema, metadata, and codec. + */ + public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations( + FilenamePolicy filenamePolicy, + Schema schema, + Map<String, Object> metadata, + CodecFactory codec, + SerializableFunction<UserT, OutputT> formatFunction) { + return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction); + } ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index c78870b..acd3ea6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -17,93 +17,90 @@ */ package org.apache.beam.sdk.io; -import com.google.common.collect.ImmutableMap; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** A {@link FileBasedSink} for Avro files. */ -class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> { - private final AvroCoder<T> coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; +class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, DestinationT, OutputT> { + private final DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations; + private final boolean genericRecords; AvroSink( ValueProvider<ResourceId> outputPrefix, - DynamicDestinations<T, DestinationT> dynamicDestinations, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations, + boolean genericRecords) { // Avro handle compression internally using the codec. super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; + this.dynamicDestinations = dynamicDestinations; + this.genericRecords = genericRecords; } @Override - public WriteOperation<T, DestinationT> createWriteOperation() { - return new AvroWriteOperation<>(this, coder, codec, metadata); + public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() { + return (DynamicAvroDestinations<UserT, DestinationT, OutputT>) super.getDynamicDestinations(); + } + + @Override + public WriteOperation<DestinationT, OutputT> createWriteOperation() { + return new AvroWriteOperation<>(this, genericRecords); } /** A {@link WriteOperation WriteOperation} for Avro files. */ - private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> { - private final AvroCoder<T> coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; + private static class AvroWriteOperation<DestinationT, OutputT> + extends WriteOperation<DestinationT, OutputT> { + private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations; + private final boolean genericRecords; - private AvroWriteOperation( - AvroSink<T, DestinationT> sink, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) { super(sink); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; + this.dynamicDestinations = sink.getDynamicDestinations(); + this.genericRecords = genericRecords; } @Override - public Writer<T, DestinationT> createWriter() throws Exception { - return new AvroWriter<>(this, coder, codec, metadata); + public Writer<DestinationT, OutputT> createWriter() throws Exception { + return new AvroWriter<>(this, dynamicDestinations, genericRecords); } } /** A {@link Writer Writer} for Avro files. */ - private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> { - private final AvroCoder<T> coder; - private DataFileWriter<T> dataFileWriter; - private SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; + private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT, OutputT> { + private DataFileWriter<OutputT> dataFileWriter; + private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations; + private final boolean genericRecords; public AvroWriter( - WriteOperation<T, DestinationT> writeOperation, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { + WriteOperation<DestinationT, OutputT> writeOperation, + DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations, + boolean genericRecords) { super(writeOperation, MimeTypes.BINARY); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; + this.dynamicDestinations = dynamicDestinations; + this.genericRecords = genericRecords; } @SuppressWarnings("deprecation") // uses internal test functionality. @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { - DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class) - ? new GenericDatumWriter<T>(coder.getSchema()) - : new ReflectDatumWriter<T>(coder.getSchema()); + DestinationT destination = getDestination(); + CodecFactory codec = dynamicDestinations.getCodec(destination); + Schema schema = dynamicDestinations.getSchema(destination); + Map<String, Object> metadata = dynamicDestinations.getMetadata(destination); - dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); + DatumWriter<OutputT> datumWriter = + genericRecords + ? new GenericDatumWriter<OutputT>(schema) + : new ReflectDatumWriter<OutputT>(schema); + dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec); for (Map.Entry<String, Object> entry : metadata.entrySet()) { Object v = entry.getValue(); if (v instanceof String) { @@ -118,11 +115,11 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> { + v.getClass().getSimpleName()); } } - dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); + dataFileWriter.create(schema, Channels.newOutputStream(channel)); } @Override - public void write(T value) throws Exception { + public void write(OutputT value) throws Exception { dataFileWriter.append(value); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java new file mode 100644 index 0000000..b006e26 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.io.BaseEncoding; +import java.io.Serializable; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; + +/** Always returns a constant {@link FilenamePolicy}, {@link Schema}, metadata, and codec. */ +class ConstantAvroDestination<UserT, OutputT> + extends DynamicAvroDestinations<UserT, Void, OutputT> { + private static class SchemaFunction implements Serializable, Function<String, Schema> { + @Nullable + @Override + public Schema apply(@Nullable String input) { + return new Schema.Parser().parse(input); + } + } + + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; + private final FilenamePolicy filenamePolicy; + private final Supplier<Schema> schema; + private final Map<String, Object> metadata; + private final SerializableAvroCodecFactory codec; + private final SerializableFunction<UserT, OutputT> formatFunction; + + private class Metadata implements HasDisplayData { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + DisplayData.Type type = DisplayData.inferType(entry.getValue()); + if (type != null) { + builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); + } else { + String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue()); + String repr = + base64.length() <= METADATA_BYTES_MAX_LENGTH + ? base64 + : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "..."; + builder.add(DisplayData.item(entry.getKey(), repr)); + } + } + } + } + + public ConstantAvroDestination( + FilenamePolicy filenamePolicy, + Schema schema, + Map<String, Object> metadata, + CodecFactory codec, + SerializableFunction<UserT, OutputT> formatFunction) { + this.filenamePolicy = filenamePolicy; + this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString())); + this.metadata = metadata; + this.codec = new SerializableAvroCodecFactory(codec); + this.formatFunction = formatFunction; + } + + @Override + public OutputT formatRecord(UserT record) { + return formatFunction.apply(record); + } + + @Override + public Void getDestination(UserT element) { + return (Void) null; + } + + @Override + public Void getDefaultDestination() { + return (Void) null; + } + + @Override + public FilenamePolicy getFilenamePolicy(Void destination) { + return filenamePolicy; + } + + @Override + public Schema getSchema(Void destination) { + return schema.get(); + } + + @Override + public Map<String, Object> getMetadata(Void destination) { + return metadata; + } + + @Override + public CodecFactory getCodec(Void destination) { + return codec.getCodec(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + filenamePolicy.populateDisplayData(builder); + builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema")); + builder.addIfNotDefault( + DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"), + AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString()); + builder.include("Metadata", new Metadata()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 4021609..1f438d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -157,7 +157,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { && shardTemplate.equals(other.shardTemplate) && suffix.equals(other.suffix); } - @Override public String toString() { return MoreObjects.toStringHelper(this) http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java new file mode 100644 index 0000000..f4e8ee6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; + +/** + * A specialization of {@link DynamicDestinations} for {@link AvroIO}. In addition to dynamic file + * destinations, this allows specifying other AVRO properties (schema, metadata, codec) per + * destination. + */ +public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT> + extends DynamicDestinations<UserT, DestinationT, OutputT> { + /** Return an AVRO schema for a given destination. */ + public abstract Schema getSchema(DestinationT destination); + + /** Return AVRO file metadata for a given destination. */ + public Map<String, Object> getMetadata(DestinationT destination) { + return ImmutableMap.<String, Object>of(); + } + + /** Return an AVRO codec for a given destination. */ + public CodecFactory getCodec(DestinationT destination) { + return AvroIO.TypedWrite.DEFAULT_CODEC; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java index d05a01a7..b087bc5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import javax.annotation.Nullable; @@ -28,20 +27,30 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; /** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */ public class DynamicFileDestinations { /** Always returns a constant {@link FilenamePolicy}. */ - private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> { + private static class ConstantFilenamePolicy<UserT, OutputT> + extends DynamicDestinations<UserT, Void, OutputT> { private final FilenamePolicy filenamePolicy; + private final SerializableFunction<UserT, OutputT> formatFunction; - public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) { - this.filenamePolicy = checkNotNull(filenamePolicy); + public ConstantFilenamePolicy( + FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) { + this.filenamePolicy = filenamePolicy; + this.formatFunction = formatFunction; } @Override - public Void getDestination(T element) { + public OutputT formatRecord(UserT record) { + return formatFunction.apply(record); + } + + @Override + public Void getDestination(UserT element) { return (Void) null; } @@ -71,14 +80,24 @@ public class DynamicFileDestinations { * A base class for a {@link DynamicDestinations} object that returns differently-configured * instances of {@link DefaultFilenamePolicy}. */ - private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> { - SerializableFunction<UserT, Params> destinationFunction; - Params emptyDestination; + private static class DefaultPolicyDestinations<UserT, OutputT> + extends DynamicDestinations<UserT, Params, OutputT> { + private final SerializableFunction<UserT, Params> destinationFunction; + private final Params emptyDestination; + private final SerializableFunction<UserT, OutputT> formatFunction; public DefaultPolicyDestinations( - SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { + SerializableFunction<UserT, Params> destinationFunction, + Params emptyDestination, + SerializableFunction<UserT, OutputT> formatFunction) { this.destinationFunction = destinationFunction; this.emptyDestination = emptyDestination; + this.formatFunction = formatFunction; + } + + @Override + public OutputT formatRecord(UserT record) { + return formatFunction.apply(record); } @Override @@ -104,16 +123,28 @@ public class DynamicFileDestinations { } /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */ - public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) { - return new ConstantFilenamePolicy<>(filenamePolicy); + public static <UserT, OutputT> DynamicDestinations<UserT, Void, OutputT> constant( + FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) { + return new ConstantFilenamePolicy<>(filenamePolicy, formatFunction); + } + + /** + * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where + * UserT and OutputT are the same type and the format function is the identity. + */ + public static <UserT> DynamicDestinations<UserT, Void, UserT> constant( + FilenamePolicy filenamePolicy) { + return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity()); } /** * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy} * configured with the given {@link Params}. */ - public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies( - SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) { - return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination); + public static <UserT, OutputT> DynamicDestinations<UserT, Params, OutputT> toDefaultPolicies( + SerializableFunction<UserT, Params> destinationFunction, + Params emptyDestination, + SerializableFunction<UserT, OutputT> formatFunction) { + return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination, formatFunction); } }
