http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java index 511d697..b57b28c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -34,27 +34,29 @@ import org.apache.beam.sdk.util.MimeTypes; * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the * last) is terminated. */ -class TextSink extends FileBasedSink<String> { +class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> { @Nullable private final String header; @Nullable private final String footer; TextSink( ValueProvider<ResourceId> baseOutputFilename, - FilenamePolicy filenamePolicy, + DynamicDestinations<UserT, DestinationT> dynamicDestinations, @Nullable String header, @Nullable String footer, WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, filenamePolicy, writableByteChannelFactory); + super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory); this.header = header; this.footer = footer; } + @Override - public WriteOperation<String> createWriteOperation() { - return new TextWriteOperation(this, header, footer); + public WriteOperation<String, DestinationT> createWriteOperation() { + return new TextWriteOperation<>(this, header, footer); } /** A {@link WriteOperation WriteOperation} for text files. */ - private static class TextWriteOperation extends WriteOperation<String> { + private static class TextWriteOperation<DestinationT> + extends WriteOperation<String, DestinationT> { @Nullable private final String header; @Nullable private final String footer; @@ -65,20 +67,20 @@ class TextSink extends FileBasedSink<String> { } @Override - public Writer<String> createWriter() throws Exception { - return new TextWriter(this, header, footer); + public Writer<String, DestinationT> createWriter() throws Exception { + return new TextWriter<>(this, header, footer); } } /** A {@link Writer Writer} for text files. */ - private static class TextWriter extends Writer<String> { + private static class TextWriter<DestinationT> extends Writer<String, DestinationT> { private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; private OutputStreamWriter out; public TextWriter( - WriteOperation<String> writeOperation, + WriteOperation<String, DestinationT> writeOperation, @Nullable String header, @Nullable String footer) { super(writeOperation, MimeTypes.TEXT);
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index a220eab..7013044 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -20,9 +20,12 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -30,8 +33,11 @@ import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FileResult; @@ -47,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -55,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; @@ -72,13 +81,12 @@ import org.slf4j.LoggerFactory; * global initialization of a sink, followed by a parallel write, and ends with a sequential * finalization of the write. The output of a write is {@link PDone}. * - * <p>By default, every bundle in the input {@link PCollection} will be processed by a - * {@link WriteOperation}, so the number of output - * will vary based on runner behavior, though at least 1 output will always be produced. The - * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards}, - * typically used to control how many files are produced or to globally limit the number of - * workers connecting to an external service. However, this option can often hurt performance: it - * adds an additional {@link GroupByKey} to the pipeline. + * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link + * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1 + * output will always be produced. The exact parallelism of the write stage can be controlled using + * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to + * globally limit the number of workers connecting to an external service. However, this option can + * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline. * * <p>Example usage with runner-determined sharding: * @@ -89,7 +97,8 @@ import org.slf4j.LoggerFactory; * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre> */ @Experimental(Experimental.Kind.SOURCE_SINK) -public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { +public class WriteFiles<UserT, DestinationT, OutputT> + extends PTransform<PCollection<UserT>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); // The maximum number of file writers to keep open in a single bundle at a time, since file @@ -105,12 +114,12 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; static final int UNKNOWN_SHARDNUM = -1; - private FileBasedSink<T> sink; - private WriteOperation<T> writeOperation; + private FileBasedSink<OutputT, DestinationT> sink; + private SerializableFunction<UserT, OutputT> formatFunction; + private WriteOperation<OutputT, DestinationT> writeOperation; // This allows the number of shards to be dynamically computed based on the input // PCollection. - @Nullable - private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; + @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards; // We don't use a side input for static sharding, as we want this value to be updatable // when a pipeline is updated. @Nullable @@ -122,19 +131,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting * the runner control how many different shards are produced. */ - public static <T> WriteFiles<T> to(FileBasedSink<T> sink) { + public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to( + FileBasedSink<OutputT, DestinationT> sink, + SerializableFunction<UserT, OutputT> formatFunction) { checkNotNull(sink, "sink"); - return new WriteFiles<>(sink, null /* runner-determined sharding */, null, - false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE); + return new WriteFiles<>( + sink, + formatFunction, + null /* runner-determined sharding */, + null, + false, + DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE); } private WriteFiles( - FileBasedSink<T> sink, - @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, + FileBasedSink<OutputT, DestinationT> sink, + SerializableFunction<UserT, OutputT> formatFunction, + @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards, @Nullable ValueProvider<Integer> numShardsProvider, boolean windowedWrites, int maxNumWritersPerBundle) { this.sink = sink; + this.formatFunction = checkNotNull(formatFunction); this.computeNumShards = computeNumShards; this.numShardsProvider = numShardsProvider; this.windowedWrites = windowedWrites; @@ -142,7 +160,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } @Override - public PDone expand(PCollection<T> input) { + public PDone expand(PCollection<UserT> input) { if (input.isBounded() == IsBounded.UNBOUNDED) { checkArgument(windowedWrites, "Must use windowed writes when applying %s to an unbounded PCollection", @@ -181,13 +199,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } } - /** - * Returns the {@link FileBasedSink} associated with this PTransform. - */ - public FileBasedSink<T> getSink() { + /** Returns the {@link FileBasedSink} associated with this PTransform. */ + public FileBasedSink<OutputT, DestinationT> getSink() { return sink; } + /** Returns the the format function that maps the user type to the record written to files. */ + public SerializableFunction<UserT, OutputT> getFormatFunction() { + return formatFunction; + } + /** * Returns whether or not to perform windowed writes. */ @@ -202,7 +223,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * #withRunnerDeterminedSharding()}. */ @Nullable - public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { + public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() { return computeNumShards; } @@ -220,7 +241,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * <p>A value less than or equal to 0 will be equivalent to the default behavior of * runner-determined sharding. */ - public WriteFiles<T> withNumShards(int numShards) { + public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) { if (numShards > 0) { return withNumShards(StaticValueProvider.of(numShards)); } @@ -234,16 +255,26 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for * more information. */ - public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) { - return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites, + public WriteFiles<UserT, DestinationT, OutputT> withNumShards( + ValueProvider<Integer> numShardsProvider) { + return new WriteFiles<>( + sink, + formatFunction, + computeNumShards, + numShardsProvider, + windowedWrites, maxNumWritersPerBundle); } - /** - * Set the maximum number of writers created in a bundle before spilling to shuffle. - */ - public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) { - return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites, + /** Set the maximum number of writers created in a bundle before spilling to shuffle. */ + public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle( + int maxNumWritersPerBundle) { + return new WriteFiles<>( + sink, + formatFunction, + computeNumShards, + numShardsProvider, + windowedWrites, maxNumWritersPerBundle); } @@ -254,97 +285,167 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for * more information. */ - public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { + public WriteFiles<UserT, DestinationT, OutputT> withSharding( + PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) { checkNotNull( sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle); + return new WriteFiles<>( + sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle); } /** * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with * runner-determined sharding. */ - public WriteFiles<T> withRunnerDeterminedSharding() { - return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle); + public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() { + return new WriteFiles<>( + sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle); } /** * Returns a new {@link WriteFiles} that writes preserves windowing on it's input. * - * <p>If this option is not specified, windowing and triggering are replaced by - * {@link GlobalWindows} and {@link DefaultTrigger}. + * <p>If this option is not specified, windowing and triggering are replaced by {@link + * GlobalWindows} and {@link DefaultTrigger}. * - * <p>If there is no data for a window, no output shards will be generated for that window. - * If a window triggers multiple times, then more than a single output shard might be - * generated multiple times; it's up to the sink implementation to keep these output shards - * unique. + * <p>If there is no data for a window, no output shards will be generated for that window. If a + * window triggers multiple times, then more than a single output shard might be generated + * multiple times; it's up to the sink implementation to keep these output shards unique. * - * <p>This option can only be used if {@link #withNumShards(int)} is also set to a - * positive value. + * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value. */ - public WriteFiles<T> withWindowedWrites() { - return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true, - maxNumWritersPerBundle); + public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() { + return new WriteFiles<>( + sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle); + } + + private static class WriterKey<DestinationT> { + private final BoundedWindow window; + private final PaneInfo paneInfo; + private final DestinationT destination; + + WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { + this.window = window; + this.paneInfo = paneInfo; + this.destination = destination; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof WriterKey)) { + return false; + } + WriterKey other = (WriterKey) o; + return Objects.equal(window, other.window) + && Objects.equal(paneInfo, other.paneInfo) + && Objects.equal(destination, other.destination); + } + + @Override + public int hashCode() { + return Objects.hashCode(window, paneInfo, destination); + } + } + + // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's + // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination + // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so + // this can be used as a key. + private static <DestinationT> int hashDestination( + DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException { + return Hashing.murmur3_32() + .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination)) + .asInt(); } /** - * Writes all the elements in a bundle using a {@link Writer} produced by the - * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled. + * Writes all the elements in a bundle using a {@link Writer} produced by the {@link + * WriteOperation} associated with the {@link FileBasedSink}. */ - private class WriteWindowedBundles extends DoFn<T, FileResult> { - private final TupleTag<KV<Integer, T>> unwrittedRecordsTag; - private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters; - int spilledShardNum = UNKNOWN_SHARDNUM; - - WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) { - this.unwrittedRecordsTag = unwrittedRecordsTag; + private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> { + private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag; + private final Coder<DestinationT> destinationCoder; + private final boolean windowedWrites; + + private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers; + private int spilledShardNum = UNKNOWN_SHARDNUM; + + WriteBundles( + boolean windowedWrites, + TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag, + Coder<DestinationT> destinationCoder) { + this.windowedWrites = windowedWrites; + this.unwrittenRecordsTag = unwrittenRecordsTag; + this.destinationCoder = destinationCoder; } @StartBundle public void startBundle(StartBundleContext c) { // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. - windowedWriters = Maps.newHashMap(); + writers = Maps.newHashMap(); } @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { PaneInfo paneInfo = c.pane(); - Writer<T> writer; // If we are doing windowed writes, we need to ensure that we have separate files for - // data in different windows/panes. - KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo); - writer = windowedWriters.get(key); + // data in different windows/panes. Similar for dynamic writes, make sure that different + // destinations go to different writers. + // In the case of unwindowed writes, the window and the pane will always be the same, and + // the map will only have a single element. + DestinationT destination = sink.getDynamicDestinations().getDestination(c.element()); + WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); + Writer<OutputT, DestinationT> writer = writers.get(key); if (writer == null) { - if (windowedWriters.size() <= maxNumWritersPerBundle) { + if (writers.size() <= maxNumWritersPerBundle) { String uuid = UUID.randomUUID().toString(); LOG.info( - "Opening writer {} for write operation {}, window {} pane {}", + "Opening writer {} for write operation {}, window {} pane {} destination {}", uuid, writeOperation, window, - paneInfo); + paneInfo, + destination); writer = writeOperation.createWriter(); - writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM); - windowedWriters.put(key, writer); + if (windowedWrites) { + writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination); + } else { + writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination); + } + writers.put(key, writer); LOG.debug("Done opening writer"); } else { if (spilledShardNum == UNKNOWN_SHARDNUM) { + // Cache the random value so we only call ThreadLocalRandom once per DoFn instance. spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); } else { spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; } - c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element())); + c.output( + unwrittenRecordsTag, + KV.of( + ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum), + c.element())); return; } } - writeOrClose(writer, c.element()); + writeOrClose(writer, formatFunction.apply(c.element())); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) { - FileResult result = entry.getValue().close(); - BoundedWindow window = entry.getKey().getKey(); + for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry : + writers.entrySet()) { + Writer<OutputT, DestinationT> writer = entry.getValue(); + FileResult<DestinationT> result; + try { + result = writer.close(); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + BoundedWindow window = entry.getKey().window; c.output(result, window.maxTimestamp(), window); } } @@ -355,90 +456,62 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } } - /** - * Writes all the elements in a bundle using a {@link Writer} produced by the - * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled. - */ - private class WriteUnwindowedBundles extends DoFn<T, FileResult> { - // Writer that will write the records in this bundle. Lazily - // initialized in processElement. - private Writer<T> writer = null; - private BoundedWindow window = null; - - @StartBundle - public void startBundle(StartBundleContext c) { - // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. - writer = null; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // Cache a single writer for the bundle. - if (writer == null) { - LOG.info("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(); - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); - LOG.debug("Done opening writer"); - } - this.window = window; - writeOrClose(this.writer, c.element()); - } + enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING } - @FinishBundle - public void finishBundle(FinishBundleContext c) throws Exception { - if (writer == null) { - return; - } - FileResult result = writer.close(); - c.output(result, window.maxTimestamp(), window); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(WriteFiles.this); - } - } - - enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }; - - /** - * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements - * for each shard have been collected into a single iterable. + /* + * Like {@link WriteBundles}, but where the elements for each shard have been collected into a + * single iterable. */ - private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> { + private class WriteShardedBundles + extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { ShardAssignment shardNumberAssignment; WriteShardedBundles(ShardAssignment shardNumberAssignment) { this.shardNumberAssignment = shardNumberAssignment; } + @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // In a sharded write, single input element represents one shard. We can open and close - // the writer in each call to processElement. - LOG.info("Opening writer for write operation {}", writeOperation); - Writer<T> writer = writeOperation.createWriter(); - if (windowedWrites) { - int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING - ? c.element().getKey() : UNKNOWN_SHARDNUM; - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); - } - LOG.debug("Done opening writer"); - - try { - for (T t : c.element().getValue()) { - writeOrClose(writer, t); + // Since we key by a 32-bit hash of the destination, there might be multiple destinations + // in this iterable. The number of destinations is generally very small (1000s or less), so + // there will rarely be hash collisions. + Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap(); + for (UserT input : c.element().getValue()) { + DestinationT destination = sink.getDynamicDestinations().getDestination(input); + Writer<OutputT, DestinationT> writer = writers.get(destination); + if (writer == null) { + LOG.debug("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(); + if (windowedWrites) { + int shardNumber = + shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING + ? c.element().getKey().getShardNumber() + : UNKNOWN_SHARDNUM; + writer.openWindowed( + UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); + } + LOG.debug("Done opening writer"); + writers.put(destination, writer); + } + writeOrClose(writer, formatFunction.apply(input)); } - // Close the writer; if this throws let the error propagate. - FileResult result = writer.close(); - c.output(result); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; + // Close all writers. + for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) { + Writer<OutputT, DestinationT> writer = entry.getValue(); + FileResult<DestinationT> result; + try { + // Close the writer; if this throws let the error propagate. + result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + } } - } @Override public void populateDisplayData(DisplayData.Builder builder) { @@ -446,12 +519,15 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } } - private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception { + private static <OutputT, DestinationT> void writeOrClose( + Writer<OutputT, DestinationT> writer, OutputT t) throws Exception { try { writer.write(t); } catch (Exception e) { try { writer.close(); + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); } catch (Exception closeException) { if (closeException instanceof InterruptedException) { // Do not silently ignore interrupted state. @@ -464,20 +540,25 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } } - private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { + private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> { private final PCollectionView<Integer> numShardsView; private final ValueProvider<Integer> numShardsProvider; + private final Coder<DestinationT> destinationCoder; + private int shardNumber; - ApplyShardingKey(PCollectionView<Integer> numShardsView, - ValueProvider<Integer> numShardsProvider) { + ApplyShardingKey( + PCollectionView<Integer> numShardsView, + ValueProvider<Integer> numShardsProvider, + Coder<DestinationT> destinationCoder) { + this.destinationCoder = destinationCoder; this.numShardsView = numShardsView; this.numShardsProvider = numShardsProvider; shardNumber = UNKNOWN_SHARDNUM; } @ProcessElement - public void processElement(ProcessContext context) { + public void processElement(ProcessContext context) throws IOException { final int shardCount; if (numShardsView != null) { shardCount = context.sideInput(numShardsView); @@ -497,86 +578,110 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } else { shardNumber = (shardNumber + 1) % shardCount; } - context.output(KV.of(shardNumber, context.element())); + // We avoid using destination itself as a sharding key, because destination is often large. + // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path + // to the file. Often most of the path is constant across all destinations, just the path + // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully + // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve + // the destinations. This does mean that multiple destinations might end up on the same shard, + // however the number of collisions should be small, so there's no need to worry about memory + // issues. + DestinationT destination = sink.getDynamicDestinations().getDestination(context.element()); + context.output( + KV.of( + ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber), + context.element())); } } /** * A write is performed as sequence of three {@link ParDo}'s. * - * <p>This singleton collection containing the WriteOperation is then used as a side - * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase, - * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in - * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and - * {@link Writer#write} method is called for every element in the bundle. The output - * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink} - * for a description of writer results)-one for each bundle. + * <p>This singleton collection containing the WriteOperation is then used as a side input to a + * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link + * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and + * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, + * respectively, and {@link Writer#write} method is called for every element in the bundle. The + * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link + * FileBasedSink} for a description of writer results)-one for each bundle. * * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer - * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called - * to finalize the write. + * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize + * the write. * - * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be - * called before the exception that caused the write to fail is propagated and the write result - * will be discarded. + * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called + * before the exception that caused the write to fail is propagated and the write result will be + * discarded. * * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and * deserialized in the bundle-writing and finalization phases, any state change to the - * WriteOperation object that occurs during initialization is visible in the latter - * phases. However, the WriteOperation is not serialized after the bundle-writing - * phase. This is why implementations should guarantee that - * {@link WriteOperation#createWriter} does not mutate WriteOperation). + * WriteOperation object that occurs during initialization is visible in the latter phases. + * However, the WriteOperation is not serialized after the bundle-writing phase. This is why + * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate + * WriteOperation). */ - private PDone createWrite(PCollection<T> input) { + private PDone createWrite(PCollection<UserT> input) { Pipeline p = input.getPipeline(); if (!windowedWrites) { // Re-window the data into the global window and remove any existing triggers. input = input.apply( - Window.<T>into(new GlobalWindows()) + Window.<UserT>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); } - // Perform the per-bundle writes as a ParDo on the input PCollection (with the // WriteOperation as a side input) and collect the results of the writes in a // PCollection. There is a dependency between this ParDo and the first (the // WriteOperation PCollection as a side input), so this will happen after the // initial ParDo. - PCollection<FileResult> results; + PCollection<FileResult<DestinationT>> results; final PCollectionView<Integer> numShardsView; @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); + final Coder<DestinationT> destinationCoder; + try { + destinationCoder = + sink.getDynamicDestinations() + .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); + destinationCoder.verifyDeterministic(); + } catch (CannotProvideCoderException | NonDeterministicException e) { + throw new RuntimeException(e); + } + if (computeNumShards == null && numShardsProvider == null) { numShardsView = null; - if (windowedWrites) { - TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag"); - TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag"); - PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of( - new WriteWindowedBundles(unwrittedRecordsTag)) - .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); - PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag) - .setCoder(FileResultCoder.of(shardedWindowCoder)); - // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in - // finalize to stay consistent with what WriteWindowedBundles does. - PCollection<FileResult> writtenGroupedFiles = - writeTuple - .get(unwrittedRecordsTag) - .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder())) - .apply("GroupUnwritten", GroupByKey.<Integer, T>create()) - .apply("WriteUnwritten", ParDo.of( - new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))) - .setCoder(FileResultCoder.of(shardedWindowCoder)); - results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles) - .apply(Flatten.<FileResult>pCollections()); - } else { - results = - input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles())); - } + TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag"); + TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag = + new TupleTag<>("unwrittenRecordsTag"); + String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles"; + PCollectionTuple writeTuple = + input.apply( + writeName, + ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder)) + .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); + PCollection<FileResult<DestinationT>> writtenBundleFiles = + writeTuple + .get(writtenRecordsTag) + .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in + // finalize to stay consistent with what WriteWindowedBundles does. + PCollection<FileResult<DestinationT>> writtenGroupedFiles = + writeTuple + .get(unwrittedRecordsTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) + .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) + .apply( + "WriteUnwritten", + ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))) + .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + results = + PCollectionList.of(writtenBundleFiles) + .and(writtenGroupedFiles) + .apply(Flatten.<FileResult<DestinationT>>pCollections()); } else { List<PCollectionView<?>> sideInputs = Lists.newArrayList(); if (computeNumShards != null) { @@ -585,23 +690,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } else { numShardsView = null; } - - PCollection<KV<Integer, Iterable<T>>> sharded = + PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded = input - .apply("ApplyShardLabel", ParDo.of( - new ApplyShardingKey<T>(numShardsView, - (numShardsView != null) ? null : numShardsProvider)) - .withSideInputs(sideInputs)) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()); + .apply( + "ApplyShardLabel", + ParDo.of( + new ApplyShardingKey( + numShardsView, + (numShardsView != null) ? null : numShardsProvider, + destinationCoder)) + .withSideInputs(sideInputs)) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) + .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()); + shardedWindowCoder = + (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder(); // Since this path might be used by streaming runners processing triggers, it's important // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE // strategy works by sorting all FileResult objects and assigning them numbers, which is not // guaranteed to work well when processing triggers - if the finalize step retries it might // see a different Iterable of FileResult objects, and it will assign different shard numbers. - results = sharded.apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))); + results = + sharded.apply( + "WriteShardedBundles", + ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))); } - results.setCoder(FileResultCoder.of(shardedWindowCoder)); + results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); if (windowedWrites) { // When processing streaming windowed writes, results will arrive multiple times. This @@ -609,26 +722,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // as new data arriving into a side input does not trigger the listening DoFn. Instead // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered // whenever new data arrives. - PCollection<KV<Void, FileResult>> keyedResults = - results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null)); - keyedResults.setCoder(KvCoder.of(VoidCoder.of(), - FileResultCoder.of(shardedWindowCoder))); + PCollection<KV<Void, FileResult<DestinationT>>> keyedResults = + results.apply( + "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null)); + keyedResults.setCoder( + KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder))); // Is the continuation trigger sufficient? keyedResults - .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create()) - .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult> results = Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation"); - } - })); + .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create()) + .apply( + "Finalize", + ParDo.of( + new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult<DestinationT>> results = + Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results); + LOG.debug("Done finalizing write operation"); + } + })); } else { - final PCollectionView<Iterable<FileResult>> resultsView = - results.apply(View.<FileResult>asIterable()); + final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = + results.apply(View.<FileResult<DestinationT>>asIterable()); ImmutableList.Builder<PCollectionView<?>> sideInputs = ImmutableList.<PCollectionView<?>>builder().add(resultsView); if (numShardsView != null) { @@ -644,41 +762,53 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // set numShards, then all shards will be written out as empty files. For this reason we // use a side input here. PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); - singletonCollection - .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); - - // We must always output at least 1 shard, and honor user-specified numShards if - // set. - int minShardsNeeded; - if (numShardsView != null) { - minShardsNeeded = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - minShardsNeeded = numShardsProvider.get(); - } else { - minShardsNeeded = 1; - } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written for a total of {}.", - extraShardsNeeded, results.size(), minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<T> writer = writeOperation.createWriter(); - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); - FileResult emptyWrite = writer.close(); - results.add(emptyWrite); - } - LOG.debug("Done creating extra shards."); - } - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(sideInputs.build())); + singletonCollection.apply( + "Finalize", + ParDo.of( + new DoFn<Void, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult<DestinationT>> results = + Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug( + "Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards + // if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; + } + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written " + + "for a total of {}.", + extraShardsNeeded, + results.size(), + minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer<OutputT, DestinationT> writer = writeOperation.createWriter(); + writer.openUnwindowed( + UUID.randomUUID().toString(), + UNKNOWN_SHARDNUM, + sink.getDynamicDestinations().getDefaultDestination()); + FileResult<DestinationT> emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }) + .withSideInputs(sideInputs.build())); } return PDone.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java new file mode 100644 index 0000000..d057d81 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +/** Useful {@link SerializableFunction} overrides. */ +public class SerializableFunctions { + private static class Identity<T> implements SerializableFunction<T, T> { + @Override + public T apply(T input) { + return input; + } + } + + private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> { + OutT value; + + Constant(OutT value) { + this.value = value; + } + + @Override + public OutT apply(InT input) { + return value; + } + } + + public static <T> SerializableFunction<T, T> identity() { + return new Identity<>(); + } + + public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) { + return new Constant<>(value); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java new file mode 100644 index 0000000..e56af13 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.values; + +import java.io.Serializable; +import java.util.Objects; + +/** A key and a shard number. */ +public class ShardedKey<K> implements Serializable { + private static final long serialVersionUID = 1L; + private final K key; + private final int shardNumber; + + public static <K> ShardedKey<K> of(K key, int shardNumber) { + return new ShardedKey<>(key, shardNumber); + } + + private ShardedKey(K key, int shardNumber) { + this.key = key; + this.shardNumber = shardNumber; + } + + public K getKey() { + return key; + } + + public int getShardNumber() { + return shardNumber; + } + + @Override + public String toString() { + return "key: " + key + " shard: " + shardNumber; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ShardedKey)) { + return false; + } + ShardedKey<K> other = (ShardedKey<K>) o; + return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber); + } + + @Override + public int hashCode() { + return Objects.hash(key, shardNumber); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 6d01d32..260e47a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -54,10 +54,11 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -276,37 +277,42 @@ public class AvroIOTest { } private static class WindowedFilenamePolicy extends FilenamePolicy { - final String outputFilePrefix; + final ResourceId outputFilePrefix; - WindowedFilenamePolicy(String outputFilePrefix) { + WindowedFilenamePolicy(ResourceId outputFilePrefix) { this.outputFilePrefix = outputFilePrefix; } @Override - public ResourceId windowedFilename( - ResourceId outputDirectory, WindowedContext input, String extension) { - String filename = String.format( - "%s-%s-%s-of-%s-pane-%s%s%s", - outputFilePrefix, - input.getWindow(), - input.getShardNumber(), - input.getNumShards() - 1, - input.getPaneInfo().getIndex(), - input.getPaneInfo().isLast() ? "-final" : "", - extension); - return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) { + String filenamePrefix = + outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), ""); + + String filename = + String.format( + "%s-%s-%s-of-%s-pane-%s%s%s", + filenamePrefix, + input.getWindow(), + input.getShardNumber(), + input.getNumShards() - 1, + input.getPaneInfo().getIndex(), + input.getPaneInfo().isLast() ? "-final" : "", + outputFileHints.getSuggestedFilenameSuffix()); + return outputFilePrefix + .getCurrentDirectory() + .resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename( - ResourceId outputDirectory, Context input, String extension) { + public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Expecting windowed outputs only"); } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) - .withLabel("File Name Prefix")); + builder.add( + DisplayData.item("fileNamePrefix", outputFilePrefix.toString()) + .withLabel("File Name Prefix")); } } @@ -359,15 +365,18 @@ public class AvroIOTest { Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) .advanceWatermarkToInfinity(); - FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename); + FilenamePolicy policy = + new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename)); windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply(AvroIO.write(GenericClass.class) - .to(baseFilename) - .withFilenamePolicy(policy) - .withWindowedWrites() - .withNumShards(2)); + .apply( + AvroIO.write(GenericClass.class) + .to(policy) + .withTempDirectory( + StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true))) + .withWindowedWrites() + .withNumShards(2)); windowedAvroWritePipeline.run(); // Validate that the data written matches the expected elements in the expected order @@ -494,13 +503,14 @@ public class AvroIOTest { expectedFiles.add( new File( DefaultFilenamePolicy.constructName( - outputFilePrefix, - shardNameTemplate, - "" /* no suffix */, - i, - numShards, - null, - null))); + FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix), + shardNameTemplate, + "" /* no suffix */, + i, + numShards, + null, + null) + .toString())); } List<String> actualElements = new ArrayList<>(); @@ -572,15 +582,4 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString())); } - - @Test - public void testWindowedWriteRequiresFilenamePolicy() { - PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); - AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites(); - - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); - emptyInput.apply(write); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index 217420c..9dc6d33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.sdk.io; -import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.io.fs.ResourceId; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -30,69 +30,108 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class DefaultFilenamePolicyTest { + private static String constructName( + String baseFilename, + String shardTemplate, + String suffix, + int shardNum, + int numShards, + String paneStr, + String windowStr) { + ResourceId constructed = + DefaultFilenamePolicy.constructName( + FileSystems.matchNewResource(baseFilename, false), + shardTemplate, + suffix, + shardNum, + numShards, + paneStr, + windowStr); + return constructed.toString(); + } + @Test public void testConstructName() { - assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); + assertEquals( + "/path/to/output-001-of-123.txt", + constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); - assertEquals("out.txt/part-00042", - constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null)); + assertEquals( + "/path/to/out.txt/part-00042", + constructName("/path/to/out.txt", "/part-SSSSS", "", 42, 100, null, null)); - assertEquals("out.txt", - constructName("ou", "t.t", "xt", 1, 1, null, null)); + assertEquals("/path/to/out.txt", constructName("/path/to/ou", "t.t", "xt", 1, 1, null, null)); - assertEquals("out0102shard.txt", - constructName("out", "SSNNshard", ".txt", 1, 2, null, null)); + assertEquals( + "/path/to/out0102shard.txt", + constructName("/path/to/out", "SSNNshard", ".txt", 1, 2, null, null)); - assertEquals("out-2/1.part-1-of-2.txt", - constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null)); + assertEquals( + "/path/to/out-2/1.part-1-of-2.txt", + constructName("/path/to/out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null)); } @Test public void testConstructNameWithLargeShardCount() { - assertEquals("out-100-of-5000.txt", - constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null)); + assertEquals( + "/out-100-of-5000.txt", constructName("/out", "-SS-of-NN", ".txt", 100, 5000, null, null)); } @Test public void testConstructWindowedName() { - assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); - - assertEquals("output-001-of-123-PPP-W.txt", - constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); - - assertEquals("out.txt/part-00042-myPaneStr-myWindowStr", - constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", - "myWindowStr")); - - assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2", - "anotherWindowStr")); - - assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt", - constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", - "oneMoreWindowStr")); - - assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" - + "panemyPaneStr3.txt", - constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3", - "slidingWindow1")); + assertEquals( + "/path/to/output-001-of-123.txt", + constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); + + assertEquals( + "/path/to/output-001-of-123-PPP-W.txt", + constructName("/path/to/output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); + + assertEquals( + "/path/to/out" + ".txt/part-00042-myPaneStr-myWindowStr", + constructName( + "/path/to/out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr")); + + assertEquals( + "/path/to/out.txt", + constructName("/path/to/ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr")); + + assertEquals( + "/path/to/out0102shard-oneMoreWindowStr-anotherPaneStr.txt", + constructName( + "/path/to/out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr")); + + assertEquals( + "/out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" + + "panemyPaneStr3.txt", + constructName( + "/out", + "-N/S.part-S-of-N-W-P-windowW-paneP", + ".txt", + 1, + 2, + "myPaneStr3", + "slidingWindow1")); // test first/last pane - assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false", - constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", - "myWindowStr")); - - assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane", - "anotherWindowStr")); - - assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", - constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", - "oneMoreWindowStr")); - - assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", - constructName("out", - "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); + assertEquals( + "/out.txt/part-00042-myWindowStr-pane-11-true-false", + constructName( + "/out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", "myWindowStr")); + + assertEquals( + "/path/to/out.txt", + constructName("/path/to/ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr")); + + assertEquals( + "/out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", + constructName( + "/out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr")); + + assertEquals( + "/path/to/out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", + constructName( + "/path/to/out", "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java index 6615a2e..a7644b6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java @@ -39,7 +39,7 @@ public class DrunkWritableByteChannelFactory implements WritableByteChannelFacto } @Override - public String getFilenameSuffix() { + public String getSuggestedFilenameSuffix() { return ".drunk"; } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index caad759..755bb59 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -103,7 +103,7 @@ public class FileBasedSinkTest { SimpleSink.SimpleWriter writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); - writer.openUnwindowed(testUid, -1); + writer.openUnwindowed(testUid, -1, null); for (String value : values) { writer.write(value); } @@ -198,23 +198,27 @@ public class FileBasedSinkTest { throws Exception { int numFiles = temporaryFiles.size(); - List<FileResult> fileResults = new ArrayList<>(); + List<FileResult<Void>> fileResults = new ArrayList<>(); // Create temporary output bundles and output File objects. for (int i = 0; i < numFiles; i++) { fileResults.add( - new FileResult( + new FileResult<Void>( LocalResources.fromFile(temporaryFiles.get(i), false), WriteFiles.UNKNOWN_SHARDNUM, null, + null, null)); } writeOp.finalize(fileResults); - ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); for (int i = 0; i < numFiles; i++) { - ResourceId outputFilename = writeOp.getSink().getFilenamePolicy() - .unwindowedFilename(outputDirectory, new Context(i, numFiles), ""); + ResourceId outputFilename = + writeOp + .getSink() + .getDynamicDestinations() + .getFilenamePolicy(null) + .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED); assertTrue(new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -231,11 +235,12 @@ public class FileBasedSinkTest { private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception { String prefix = "file"; - SimpleSink sink = - new SimpleSink(getBaseOutputDirectory(), prefix, "", ""); + SimpleSink<Void> sink = + SimpleSink.makeSimpleSink( + getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED); - WriteOperation<String> writeOp = - new SimpleSink.SimpleWriteOperation(sink, tempDirectory); + WriteOperation<String, Void> writeOp = + new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); List<File> temporaryFiles = new ArrayList<>(); List<File> outputFiles = new ArrayList<>(); @@ -272,8 +277,6 @@ public class FileBasedSinkTest { @Test public void testCopyToOutputFiles() throws Exception { SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); - ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); - List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); List<String> inputContents = Arrays.asList("1", "2", "3"); List<String> expectedOutputFilenames = Arrays.asList( @@ -292,9 +295,14 @@ public class FileBasedSinkTest { File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i)); List<String> lines = Collections.singletonList(inputContents.get(i)); writeFile(lines, inputTmpFile); - inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false), - writeOp.getSink().getFilenamePolicy() - .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), "")); + inputFilePaths.put( + LocalResources.fromFile(inputTmpFile, false), + writeOp + .getSink() + .getDynamicDestinations() + .getFilenamePolicy(null) + .unwindowedFilename( + new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED)); } // Copy input files to output files. @@ -311,7 +319,8 @@ public class FileBasedSinkTest { ResourceId outputDirectory, FilenamePolicy policy, int numFiles) { List<ResourceId> filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), "")); + filenames.add( + policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED)); } return filenames; } @@ -326,8 +335,10 @@ public class FileBasedSinkTest { List<ResourceId> actual; ResourceId root = getBaseOutputDirectory(); - SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test"); - FilenamePolicy policy = sink.getFilenamePolicy(); + SimpleSink<Void> sink = + SimpleSink.makeSimpleSink( + root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED); + FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); expected = Arrays.asList( root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), @@ -352,8 +363,9 @@ public class FileBasedSinkTest { @Test public void testCollidingOutputFilenames() throws IOException { ResourceId root = getBaseOutputDirectory(); - SimpleSink sink = new SimpleSink(root, "file", "-NN", "test"); - SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); + SimpleSink<Void> sink = + SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED); + SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink); ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE); @@ -361,11 +373,11 @@ public class FileBasedSinkTest { ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { - Iterable<FileResult> results = + Iterable<FileResult<Void>> results = Lists.newArrayList( - new FileResult(temp1, 1, null, null), - new FileResult(temp2, 1, null, null), - new FileResult(temp3, 1, null, null)); + new FileResult<Void>(temp1, 1, null, null, null), + new FileResult<Void>(temp2, 1, null, null, null), + new FileResult<Void>(temp3, 1, null, null, null)); writeOp.buildOutputFilenames(results); fail("Should have failed."); } catch (IllegalStateException exn) { @@ -379,8 +391,10 @@ public class FileBasedSinkTest { List<ResourceId> expected; List<ResourceId> actual; ResourceId root = getBaseOutputDirectory(); - SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", ""); - FilenamePolicy policy = sink.getFilenamePolicy(); + SimpleSink<Void> sink = + SimpleSink.makeSimpleSink( + root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED); + FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); expected = Arrays.asList( root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), @@ -486,10 +500,11 @@ public class FileBasedSinkTest { public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; ResourceId root = getBaseOutputDirectory(); - WriteOperation<String> writeOp = - new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) + WriteOperation<String, Void> writeOp = + SimpleSink.makeSimpleSink( + root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) .createWriteOperation(); - final Writer<String> writer = writeOp.createWriter(); + final Writer<String, Void> writer = writeOp.createWriter(); final ResourceId expectedFile = writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); @@ -503,7 +518,7 @@ public class FileBasedSinkTest { expected.add("footer"); expected.add("footer"); - writer.openUnwindowed(testUid, -1); + writer.openUnwindowed(testUid, -1, null); writer.write("a"); writer.write("b"); final FileResult result = writer.close(); @@ -513,20 +528,20 @@ public class FileBasedSinkTest { } /** Build a SimpleSink with default options. */ - private SimpleSink buildSink() { - return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test"); + private SimpleSink<Void> buildSink() { + return SimpleSink.makeSimpleSink( + getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED); } - /** - * Build a SimpleWriteOperation with default options and the given temporary directory. - */ - private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) { - SimpleSink sink = buildSink(); - return new SimpleSink.SimpleWriteOperation(sink, tempDirectory); + /** Build a SimpleWriteOperation with default options and the given temporary directory. */ + private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir( + ResourceId tempDirectory) { + SimpleSink<Void> sink = buildSink(); + return new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); } /** Build a write operation with the default options for it and its parent sink. */ - private SimpleSink.SimpleWriteOperation buildWriteOperation() { + private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() { return buildSink().createWriteOperation(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index bdf37f6..9196178 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -19,37 +19,55 @@ package org.apache.beam.sdk.io; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** - * A simple {@link FileBasedSink} that writes {@link String} values as lines with - * header and footer. + * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer. */ -class SimpleSink extends FileBasedSink<String> { - public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) { - this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED); +class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { + public SimpleSink( + ResourceId tempDirectory, + DynamicDestinations<String, DestinationT> dynamicDestinations, + WritableByteChannelFactory writableByteChannelFactory) { + super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory); } - public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix, - WritableByteChannelFactory writableByteChannelFactory) { - super( - StaticValueProvider.of(baseOutputDirectory), - new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix), - writableByteChannelFactory); + public static SimpleSink<Void> makeSimpleSink( + ResourceId tempDirectory, FilenamePolicy filenamePolicy) { + return new SimpleSink<>( + tempDirectory, + DynamicFileDestinations.<String>constant(filenamePolicy), + CompressionType.UNCOMPRESSED); } - public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) { - super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy); + public static SimpleSink<Void> makeSimpleSink( + ResourceId baseDirectory, + String prefix, + String shardTemplate, + String suffix, + WritableByteChannelFactory writableByteChannelFactory) { + DynamicDestinations<String, Void> dynamicDestinations = + DynamicFileDestinations.constant( + DefaultFilenamePolicy.fromParams( + new Params() + .withBaseFilename( + baseDirectory.resolve(prefix, StandardResolveOptions.RESOLVE_FILE)) + .withShardTemplate(shardTemplate) + .withSuffix(suffix))); + return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory); } @Override - public SimpleWriteOperation createWriteOperation() { - return new SimpleWriteOperation(this); + public SimpleWriteOperation<DestinationT> createWriteOperation() { + return new SimpleWriteOperation<>(this); } - static final class SimpleWriteOperation extends WriteOperation<String> { + static final class SimpleWriteOperation<DestinationT> + extends WriteOperation<String, DestinationT> { public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) { super(sink, tempOutputDirectory); } @@ -59,12 +77,12 @@ class SimpleSink extends FileBasedSink<String> { } @Override - public SimpleWriter createWriter() throws Exception { - return new SimpleWriter(this); + public SimpleWriter<DestinationT> createWriter() throws Exception { + return new SimpleWriter<>(this); } } - static final class SimpleWriter extends Writer<String> { + static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> { static final String HEADER = "header"; static final String FOOTER = "footer";