http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 7013044..2fd10ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -20,12 +20,9 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.hash.Hashing; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -33,12 +30,8 @@ import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ShardedKeyCoder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; @@ -49,11 +42,9 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -62,17 +53,11 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.ShardedKey; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,12 +66,13 @@ import org.slf4j.LoggerFactory; * global initialization of a sink, followed by a parallel write, and ends with a sequential * finalization of the write. The output of a write is {@link PDone}. * - * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link - * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1 - * output will always be produced. The exact parallelism of the write stage can be controlled using - * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to - * globally limit the number of workers connecting to an external service. However, this option can - * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline. + * <p>By default, every bundle in the input {@link PCollection} will be processed by a + * {@link WriteOperation}, so the number of output + * will vary based on runner behavior, though at least 1 output will always be produced. The + * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards}, + * typically used to control how many files are produced or to globally limit the number of + * workers connecting to an external service. However, this option can often hurt performance: it + * adds an additional {@link GroupByKey} to the pipeline. * * <p>Example usage with runner-determined sharding: * @@ -97,70 +83,44 @@ import org.slf4j.LoggerFactory; * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre> */ @Experimental(Experimental.Kind.SOURCE_SINK) -public class WriteFiles<UserT, DestinationT, OutputT> - extends PTransform<PCollection<UserT>, PDone> { +public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); - // The maximum number of file writers to keep open in a single bundle at a time, since file - // writers default to 64mb buffers. This comes into play when writing per-window files. - // The first 20 files from a single WriteFiles transform will write files inline in the - // transform. Anything beyond that might be shuffled. - // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on - // their own policy. - private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20; - - // When we spill records, shard the output keys to prevent hotspots. - // We could consider making this a parameter. - private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; - static final int UNKNOWN_SHARDNUM = -1; - private FileBasedSink<OutputT, DestinationT> sink; - private SerializableFunction<UserT, OutputT> formatFunction; - private WriteOperation<OutputT, DestinationT> writeOperation; + private FileBasedSink<T> sink; + private WriteOperation<T> writeOperation; // This allows the number of shards to be dynamically computed based on the input // PCollection. - @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards; + @Nullable + private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; // We don't use a side input for static sharding, as we want this value to be updatable // when a pipeline is updated. @Nullable private final ValueProvider<Integer> numShardsProvider; private final boolean windowedWrites; - private int maxNumWritersPerBundle; /** * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting * the runner control how many different shards are produced. */ - public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to( - FileBasedSink<OutputT, DestinationT> sink, - SerializableFunction<UserT, OutputT> formatFunction) { + public static <T> WriteFiles<T> to(FileBasedSink<T> sink) { checkNotNull(sink, "sink"); - return new WriteFiles<>( - sink, - formatFunction, - null /* runner-determined sharding */, - null, - false, - DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE); + return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false); } private WriteFiles( - FileBasedSink<OutputT, DestinationT> sink, - SerializableFunction<UserT, OutputT> formatFunction, - @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards, + FileBasedSink<T> sink, + @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, @Nullable ValueProvider<Integer> numShardsProvider, - boolean windowedWrites, - int maxNumWritersPerBundle) { + boolean windowedWrites) { this.sink = sink; - this.formatFunction = checkNotNull(formatFunction); this.computeNumShards = computeNumShards; this.numShardsProvider = numShardsProvider; this.windowedWrites = windowedWrites; - this.maxNumWritersPerBundle = maxNumWritersPerBundle; } @Override - public PDone expand(PCollection<UserT> input) { + public PDone expand(PCollection<T> input) { if (input.isBounded() == IsBounded.UNBOUNDED) { checkArgument(windowedWrites, "Must use windowed writes when applying %s to an unbounded PCollection", @@ -199,16 +159,13 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - /** Returns the {@link FileBasedSink} associated with this PTransform. */ - public FileBasedSink<OutputT, DestinationT> getSink() { + /** + * Returns the {@link FileBasedSink} associated with this PTransform. + */ + public FileBasedSink<T> getSink() { return sink; } - /** Returns the the format function that maps the user type to the record written to files. */ - public SerializableFunction<UserT, OutputT> getFormatFunction() { - return formatFunction; - } - /** * Returns whether or not to perform windowed writes. */ @@ -223,7 +180,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> * #withRunnerDeterminedSharding()}. */ @Nullable - public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() { + public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { return computeNumShards; } @@ -241,7 +198,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> * <p>A value less than or equal to 0 will be equivalent to the default behavior of * runner-determined sharding. */ - public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) { + public WriteFiles<T> withNumShards(int numShards) { if (numShards > 0) { return withNumShards(StaticValueProvider.of(numShards)); } @@ -255,27 +212,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for * more information. */ - public WriteFiles<UserT, DestinationT, OutputT> withNumShards( - ValueProvider<Integer> numShardsProvider) { - return new WriteFiles<>( - sink, - formatFunction, - computeNumShards, - numShardsProvider, - windowedWrites, - maxNumWritersPerBundle); - } - - /** Set the maximum number of writers created in a bundle before spilling to shuffle. */ - public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle( - int maxNumWritersPerBundle) { - return new WriteFiles<>( - sink, - formatFunction, - computeNumShards, - numShardsProvider, - windowedWrites, - maxNumWritersPerBundle); + public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) { + return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites); } /** @@ -285,169 +223,127 @@ public class WriteFiles<UserT, DestinationT, OutputT> * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for * more information. */ - public WriteFiles<UserT, DestinationT, OutputT> withSharding( - PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) { + public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { checkNotNull( sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new WriteFiles<>( - sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle); + return new WriteFiles<>(sink, sharding, null, windowedWrites); } /** * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with * runner-determined sharding. */ - public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() { - return new WriteFiles<>( - sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle); + public WriteFiles<T> withRunnerDeterminedSharding() { + return new WriteFiles<>(sink, null, null, windowedWrites); } /** * Returns a new {@link WriteFiles} that writes preserves windowing on it's input. * - * <p>If this option is not specified, windowing and triggering are replaced by {@link - * GlobalWindows} and {@link DefaultTrigger}. + * <p>If this option is not specified, windowing and triggering are replaced by + * {@link GlobalWindows} and {@link DefaultTrigger}. * - * <p>If there is no data for a window, no output shards will be generated for that window. If a - * window triggers multiple times, then more than a single output shard might be generated - * multiple times; it's up to the sink implementation to keep these output shards unique. + * <p>If there is no data for a window, no output shards will be generated for that window. + * If a window triggers multiple times, then more than a single output shard might be + * generated multiple times; it's up to the sink implementation to keep these output shards + * unique. * - * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value. + * <p>This option can only be used if {@link #withNumShards(int)} is also set to a + * positive value. */ - public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() { - return new WriteFiles<>( - sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle); + public WriteFiles<T> withWindowedWrites() { + return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true); } - private static class WriterKey<DestinationT> { - private final BoundedWindow window; - private final PaneInfo paneInfo; - private final DestinationT destination; + /** + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled. + */ + private class WriteWindowedBundles extends DoFn<T, FileResult> { + private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters; - WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { - this.window = window; - this.paneInfo = paneInfo; - this.destination = destination; + @StartBundle + public void startBundle(StartBundleContext c) { + // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. + windowedWriters = Maps.newHashMap(); } - @Override - public boolean equals(Object o) { - if (!(o instanceof WriterKey)) { - return false; + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + PaneInfo paneInfo = c.pane(); + Writer<T> writer; + // If we are doing windowed writes, we need to ensure that we have separate files for + // data in different windows/panes. + KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo); + writer = windowedWriters.get(key); + if (writer == null) { + String uuid = UUID.randomUUID().toString(); + LOG.info( + "Opening writer {} for write operation {}, window {} pane {}", + uuid, + writeOperation, + window, + paneInfo); + writer = writeOperation.createWriter(); + writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM); + windowedWriters.put(key, writer); + LOG.debug("Done opening writer"); } - WriterKey other = (WriterKey) o; - return Objects.equal(window, other.window) - && Objects.equal(paneInfo, other.paneInfo) - && Objects.equal(destination, other.destination); + + writeOrClose(writer, c.element()); } - @Override - public int hashCode() { - return Objects.hashCode(window, paneInfo, destination); + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) { + FileResult result = entry.getValue().close(); + BoundedWindow window = entry.getKey().getKey(); + c.output(result, window.maxTimestamp(), window); + } } - } - // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's - // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination - // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so - // this can be used as a key. - private static <DestinationT> int hashDestination( - DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException { - return Hashing.murmur3_32() - .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination)) - .asInt(); + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteFiles.this); + } } /** - * Writes all the elements in a bundle using a {@link Writer} produced by the {@link - * WriteOperation} associated with the {@link FileBasedSink}. + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled. */ - private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> { - private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag; - private final Coder<DestinationT> destinationCoder; - private final boolean windowedWrites; - - private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers; - private int spilledShardNum = UNKNOWN_SHARDNUM; - - WriteBundles( - boolean windowedWrites, - TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag, - Coder<DestinationT> destinationCoder) { - this.windowedWrites = windowedWrites; - this.unwrittenRecordsTag = unwrittenRecordsTag; - this.destinationCoder = destinationCoder; - } + private class WriteUnwindowedBundles extends DoFn<T, FileResult> { + // Writer that will write the records in this bundle. Lazily + // initialized in processElement. + private Writer<T> writer = null; + private BoundedWindow window = null; @StartBundle public void startBundle(StartBundleContext c) { // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. - writers = Maps.newHashMap(); + writer = null; } @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - PaneInfo paneInfo = c.pane(); - // If we are doing windowed writes, we need to ensure that we have separate files for - // data in different windows/panes. Similar for dynamic writes, make sure that different - // destinations go to different writers. - // In the case of unwindowed writes, the window and the pane will always be the same, and - // the map will only have a single element. - DestinationT destination = sink.getDynamicDestinations().getDestination(c.element()); - WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); - Writer<OutputT, DestinationT> writer = writers.get(key); + // Cache a single writer for the bundle. if (writer == null) { - if (writers.size() <= maxNumWritersPerBundle) { - String uuid = UUID.randomUUID().toString(); - LOG.info( - "Opening writer {} for write operation {}, window {} pane {} destination {}", - uuid, - writeOperation, - window, - paneInfo, - destination); - writer = writeOperation.createWriter(); - if (windowedWrites) { - writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination); - } else { - writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination); - } - writers.put(key, writer); - LOG.debug("Done opening writer"); - } else { - if (spilledShardNum == UNKNOWN_SHARDNUM) { - // Cache the random value so we only call ThreadLocalRandom once per DoFn instance. - spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); - } else { - spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; - } - c.output( - unwrittenRecordsTag, - KV.of( - ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum), - c.element())); - return; - } + LOG.info("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); + LOG.debug("Done opening writer"); } - writeOrClose(writer, formatFunction.apply(c.element())); + this.window = window; + writeOrClose(this.writer, c.element()); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry : - writers.entrySet()) { - Writer<OutputT, DestinationT> writer = entry.getValue(); - FileResult<DestinationT> result; - try { - result = writer.close(); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; - } - BoundedWindow window = entry.getKey().window; - c.output(result, window.maxTimestamp(), window); + if (writer == null) { + return; } + FileResult result = writer.close(); + c.output(result, window.maxTimestamp(), window); } @Override @@ -456,62 +352,38 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING } - - /* - * Like {@link WriteBundles}, but where the elements for each shard have been collected into a - * single iterable. + /** + * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements + * for each shard have been collected into a single iterable. */ - private class WriteShardedBundles - extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { - ShardAssignment shardNumberAssignment; - WriteShardedBundles(ShardAssignment shardNumberAssignment) { - this.shardNumberAssignment = shardNumberAssignment; - } - + private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // Since we key by a 32-bit hash of the destination, there might be multiple destinations - // in this iterable. The number of destinations is generally very small (1000s or less), so - // there will rarely be hash collisions. - Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap(); - for (UserT input : c.element().getValue()) { - DestinationT destination = sink.getDynamicDestinations().getDestination(input); - Writer<OutputT, DestinationT> writer = writers.get(destination); - if (writer == null) { - LOG.debug("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(); - if (windowedWrites) { - int shardNumber = - shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING - ? c.element().getKey().getShardNumber() - : UNKNOWN_SHARDNUM; - writer.openWindowed( - UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); - } - LOG.debug("Done opening writer"); - writers.put(destination, writer); - } - writeOrClose(writer, formatFunction.apply(input)); - } + // In a sharded write, single input element represents one shard. We can open and close + // the writer in each call to processElement. + LOG.info("Opening writer for write operation {}", writeOperation); + Writer<T> writer = writeOperation.createWriter(); + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey()); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); + } + LOG.debug("Done opening writer"); - // Close all writers. - for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) { - Writer<OutputT, DestinationT> writer = entry.getValue(); - FileResult<DestinationT> result; - try { - // Close the writer; if this throws let the error propagate. - result = writer.close(); - c.output(result); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; + try { + for (T t : c.element().getValue()) { + writeOrClose(writer, t); } + + // Close the writer; if this throws let the error propagate. + FileResult result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; } - } + } @Override public void populateDisplayData(DisplayData.Builder builder) { @@ -519,15 +391,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - private static <OutputT, DestinationT> void writeOrClose( - Writer<OutputT, DestinationT> writer, OutputT t) throws Exception { + private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception { try { writer.write(t); } catch (Exception e) { try { writer.close(); - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); } catch (Exception closeException) { if (closeException instanceof InterruptedException) { // Do not silently ignore interrupted state. @@ -540,25 +409,20 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> { + private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { private final PCollectionView<Integer> numShardsView; private final ValueProvider<Integer> numShardsProvider; - private final Coder<DestinationT> destinationCoder; - private int shardNumber; - ApplyShardingKey( - PCollectionView<Integer> numShardsView, - ValueProvider<Integer> numShardsProvider, - Coder<DestinationT> destinationCoder) { - this.destinationCoder = destinationCoder; + ApplyShardingKey(PCollectionView<Integer> numShardsView, + ValueProvider<Integer> numShardsProvider) { this.numShardsView = numShardsView; this.numShardsProvider = numShardsProvider; shardNumber = UNKNOWN_SHARDNUM; } @ProcessElement - public void processElement(ProcessContext context) throws IOException { + public void processElement(ProcessContext context) { final int shardCount; if (numShardsView != null) { shardCount = context.sideInput(numShardsView); @@ -578,110 +442,65 @@ public class WriteFiles<UserT, DestinationT, OutputT> } else { shardNumber = (shardNumber + 1) % shardCount; } - // We avoid using destination itself as a sharding key, because destination is often large. - // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path - // to the file. Often most of the path is constant across all destinations, just the path - // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully - // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve - // the destinations. This does mean that multiple destinations might end up on the same shard, - // however the number of collisions should be small, so there's no need to worry about memory - // issues. - DestinationT destination = sink.getDynamicDestinations().getDestination(context.element()); - context.output( - KV.of( - ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber), - context.element())); + context.output(KV.of(shardNumber, context.element())); } } /** * A write is performed as sequence of three {@link ParDo}'s. * - * <p>This singleton collection containing the WriteOperation is then used as a side input to a - * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link - * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and - * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, - * respectively, and {@link Writer#write} method is called for every element in the bundle. The - * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link - * FileBasedSink} for a description of writer results)-one for each bundle. + * <p>This singleton collection containing the WriteOperation is then used as a side + * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase, + * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. + * {@link Writer#open} and {@link Writer#close} are called in + * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and + * {@link Writer#write} method is called for every element in the bundle. The output + * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink} + * for a description of writer results)-one for each bundle. * * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer - * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize - * the write. + * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called + * to finalize the write. * - * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called - * before the exception that caused the write to fail is propagated and the write result will be - * discarded. + * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be + * called before the exception that caused the write to fail is propagated and the write result + * will be discarded. * * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and * deserialized in the bundle-writing and finalization phases, any state change to the - * WriteOperation object that occurs during initialization is visible in the latter phases. - * However, the WriteOperation is not serialized after the bundle-writing phase. This is why - * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate - * WriteOperation). + * WriteOperation object that occurs during initialization is visible in the latter + * phases. However, the WriteOperation is not serialized after the bundle-writing + * phase. This is why implementations should guarantee that + * {@link WriteOperation#createWriter} does not mutate WriteOperation). */ - private PDone createWrite(PCollection<UserT> input) { + private PDone createWrite(PCollection<T> input) { Pipeline p = input.getPipeline(); if (!windowedWrites) { // Re-window the data into the global window and remove any existing triggers. input = input.apply( - Window.<UserT>into(new GlobalWindows()) + Window.<T>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); } + // Perform the per-bundle writes as a ParDo on the input PCollection (with the // WriteOperation as a side input) and collect the results of the writes in a // PCollection. There is a dependency between this ParDo and the first (the // WriteOperation PCollection as a side input), so this will happen after the // initial ParDo. - PCollection<FileResult<DestinationT>> results; + PCollection<FileResult> results; final PCollectionView<Integer> numShardsView; - @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); - final Coder<DestinationT> destinationCoder; - try { - destinationCoder = - sink.getDynamicDestinations() - .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); - destinationCoder.verifyDeterministic(); - } catch (CannotProvideCoderException | NonDeterministicException e) { - throw new RuntimeException(e); - } - if (computeNumShards == null && numShardsProvider == null) { numShardsView = null; - TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag"); - TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag = - new TupleTag<>("unwrittenRecordsTag"); - String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles"; - PCollectionTuple writeTuple = - input.apply( - writeName, - ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder)) - .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); - PCollection<FileResult<DestinationT>> writtenBundleFiles = - writeTuple - .get(writtenRecordsTag) - .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); - // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in - // finalize to stay consistent with what WriteWindowedBundles does. - PCollection<FileResult<DestinationT>> writtenGroupedFiles = - writeTuple - .get(unwrittedRecordsTag) - .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) - .apply( - "WriteUnwritten", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))) - .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); results = - PCollectionList.of(writtenBundleFiles) - .and(writtenGroupedFiles) - .apply(Flatten.<FileResult<DestinationT>>pCollections()); + input.apply( + "WriteBundles", + ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles())); } else { List<PCollectionView<?>> sideInputs = Lists.newArrayList(); if (computeNumShards != null) { @@ -690,31 +509,20 @@ public class WriteFiles<UserT, DestinationT, OutputT> } else { numShardsView = null; } - PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded = + + PCollection<KV<Integer, Iterable<T>>> sharded = input - .apply( - "ApplyShardLabel", - ParDo.of( - new ApplyShardingKey( - numShardsView, - (numShardsView != null) ? null : numShardsProvider, - destinationCoder)) - .withSideInputs(sideInputs)) - .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()); + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey<T>(numShardsView, + (numShardsView != null) ? null : numShardsProvider)) + .withSideInputs(sideInputs)) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()); shardedWindowCoder = (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder(); - // Since this path might be used by streaming runners processing triggers, it's important - // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE - // strategy works by sorting all FileResult objects and assigning them numbers, which is not - // guaranteed to work well when processing triggers - if the finalize step retries it might - // see a different Iterable of FileResult objects, and it will assign different shard numbers. - results = - sharded.apply( - "WriteShardedBundles", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))); + + results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles())); } - results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + results.setCoder(FileResultCoder.of(shardedWindowCoder)); if (windowedWrites) { // When processing streaming windowed writes, results will arrive multiple times. This @@ -722,31 +530,26 @@ public class WriteFiles<UserT, DestinationT, OutputT> // as new data arriving into a side input does not trigger the listening DoFn. Instead // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered // whenever new data arrives. - PCollection<KV<Void, FileResult<DestinationT>>> keyedResults = - results.apply( - "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null)); - keyedResults.setCoder( - KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder))); + PCollection<KV<Void, FileResult>> keyedResults = + results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null)); + keyedResults.setCoder(KvCoder.of(VoidCoder.of(), + FileResultCoder.of(shardedWindowCoder))); // Is the continuation trigger sufficient? keyedResults - .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create()) - .apply( - "Finalize", - ParDo.of( - new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult<DestinationT>> results = - Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation"); - } - })); + .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create()) + .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult> results = Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results); + LOG.debug("Done finalizing write operation"); + } + })); } else { - final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = - results.apply(View.<FileResult<DestinationT>>asIterable()); + final PCollectionView<Iterable<FileResult>> resultsView = + results.apply(View.<FileResult>asIterable()); ImmutableList.Builder<PCollectionView<?>> sideInputs = ImmutableList.<PCollectionView<?>>builder().add(resultsView); if (numShardsView != null) { @@ -762,53 +565,41 @@ public class WriteFiles<UserT, DestinationT, OutputT> // set numShards, then all shards will be written out as empty files. For this reason we // use a side input here. PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); - singletonCollection.apply( - "Finalize", - ParDo.of( - new DoFn<Void, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult<DestinationT>> results = - Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug( - "Side input initialized to finalize write operation {}.", writeOperation); - - // We must always output at least 1 shard, and honor user-specified numShards - // if - // set. - int minShardsNeeded; - if (numShardsView != null) { - minShardsNeeded = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - minShardsNeeded = numShardsProvider.get(); - } else { - minShardsNeeded = 1; - } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written " - + "for a total of {}.", - extraShardsNeeded, - results.size(), - minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<OutputT, DestinationT> writer = writeOperation.createWriter(); - writer.openUnwindowed( - UUID.randomUUID().toString(), - UNKNOWN_SHARDNUM, - sink.getDynamicDestinations().getDefaultDestination()); - FileResult<DestinationT> emptyWrite = writer.close(); - results.add(emptyWrite); - } - LOG.debug("Done creating extra shards."); - } - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }) - .withSideInputs(sideInputs.build())); + singletonCollection + .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; + } + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of {}.", + extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer<T> writer = writeOperation.createWriter(); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); + FileResult emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(sideInputs.build())); } return PDone.in(input.getPipeline()); }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java index b889ec7..99717a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java @@ -71,10 +71,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> { "Trying to return record which is before the last-returned record"); if (position == null) { - LOG.info( - "Adjusting range start from {} to {} as position of first returned record", - range.getStartKey(), - recordStart); range = range.withStartKey(recordStart); } position = recordStart; @@ -91,15 +87,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> { @Override public synchronized boolean trySplitAtPosition(ByteKey splitPosition) { - // Sanity check. - if (!range.containsKey(splitPosition)) { - LOG.warn( - "{}: Rejecting split request at {} because it is not within the range.", - this, - splitPosition); - return false; - } - // Unstarted. if (position == null) { LOG.warn( @@ -119,6 +106,15 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> { return false; } + // Sanity check. + if (!range.containsKey(splitPosition)) { + LOG.warn( + "{}: Rejecting split request at {} because it is not within the range.", + this, + splitPosition); + return false; + } + range = range.withEndKey(splitPosition); return true; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java deleted file mode 100644 index d3bff37..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.range; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; - -/** A restriction represented by a range of integers [from, to). */ -public class OffsetRange - implements Serializable, - HasDefaultTracker< - OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> { - private final long from; - private final long to; - - public OffsetRange(long from, long to) { - checkArgument(from <= to, "Malformed range [%s, %s)", from, to); - this.from = from; - this.to = to; - } - - public long getFrom() { - return from; - } - - public long getTo() { - return to; - } - - @Override - public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() { - return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this); - } - - @Override - public String toString() { - return "[" + from + ", " + to + ')'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - OffsetRange that = (OffsetRange) o; - - if (from != that.from) { - return false; - } - return to == that.to; - } - - @Override - public int hashCode() { - int result = (int) (from ^ (from >>> 32)); - result = 31 * result + (int) (to ^ (to >>> 32)); - return result; - } - - public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) { - List<OffsetRange> res = new ArrayList<>(); - long start = getFrom(); - long maxEnd = getTo(); - - while (start < maxEnd) { - long end = start + desiredNumOffsetsPerSplit; - end = Math.min(end, maxEnd); - // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit. - long remaining = maxEnd - end; - if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) { - end = maxEnd; - } - res.add(new OffsetRange(start, end)); - start = end; - } - return res; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java index 8f0083e..51e2b1a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java @@ -26,9 +26,6 @@ import org.slf4j.LoggerFactory; /** * A {@link RangeTracker} for non-negative positions of type {@code long}. - * - * <p>Not to be confused with {@link - * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}. */ public class OffsetRangeTracker implements RangeTracker<Long> { private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index d7e6cc8..c0990cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -184,20 +184,18 @@ public class PipelineOptionsFactory { private final String[] args; private final boolean validation; private final boolean strictParsing; - private final boolean isCli; // Do not allow direct instantiation private Builder() { - this(null, false, true, false); + this(null, false, true); } private Builder(String[] args, boolean validation, - boolean strictParsing, boolean isCli) { + boolean strictParsing) { this.defaultAppName = findCallersClassName(); this.args = args; this.validation = validation; this.strictParsing = strictParsing; - this.isCli = isCli; } /** @@ -239,7 +237,7 @@ public class PipelineOptionsFactory { */ public Builder fromArgs(String... args) { checkNotNull(args, "Arguments should not be null."); - return new Builder(args, validation, strictParsing, true); + return new Builder(args, validation, strictParsing); } /** @@ -249,7 +247,7 @@ public class PipelineOptionsFactory { * validation. */ public Builder withValidation() { - return new Builder(args, true, strictParsing, isCli); + return new Builder(args, true, strictParsing); } /** @@ -257,7 +255,7 @@ public class PipelineOptionsFactory { * arguments. */ public Builder withoutStrictParsing() { - return new Builder(args, validation, false, isCli); + return new Builder(args, validation, false); } /** @@ -302,11 +300,7 @@ public class PipelineOptionsFactory { } if (validation) { - if (isCli) { - PipelineOptionsValidator.validateCli(klass, t); - } else { - PipelineOptionsValidator.validate(klass, t); - } + PipelineOptionsValidator.validate(klass, t); } return t; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java index fcffd74..bd54ec3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java @@ -43,29 +43,9 @@ public class PipelineOptionsValidator { * * @param klass The interface to fetch validation criteria from. * @param options The {@link PipelineOptions} to validate. - * @return Validated options. + * @return The type */ public static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options) { - return validate(klass, options, false); - } - - /** - * Validates that the passed {@link PipelineOptions} from command line interface (CLI) - * conforms to all the validation criteria from the passed in interface. - * - * <p>Note that the interface requested must conform to the validation criteria specified on - * {@link PipelineOptions#as(Class)}. - * - * @param klass The interface to fetch validation criteria from. - * @param options The {@link PipelineOptions} to validate. - * @return Validated options. - */ - public static <T extends PipelineOptions> T validateCli(Class<T> klass, PipelineOptions options) { - return validate(klass, options, true); - } - - private static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options, - boolean isCli) { checkNotNull(klass); checkNotNull(options); checkArgument(Proxy.isProxyClass(options.getClass())); @@ -87,15 +67,9 @@ public class PipelineOptionsValidator { requiredGroups.put(requiredGroup, method); } } else { - if (isCli) { - checkArgument(handler.invoke(asClassOptions, method, null) != null, - "Missing required value for [--%s, \"%s\"]. ", - handler.getOptionName(method), getDescription(method)); - } else { - checkArgument(handler.invoke(asClassOptions, method, null) != null, - "Missing required value for [%s, \"%s\"]. ", - method, getDescription(method)); - } + checkArgument(handler.invoke(asClassOptions, method, null) != null, + "Missing required value for [%s, \"%s\"]. ", + method, getDescription(method)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 926a7b9..eda21a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -45,8 +45,6 @@ import com.google.common.collect.Multimap; import com.google.common.collect.MutableClassToInstanceMap; import java.beans.PropertyDescriptor; import java.io.IOException; -import java.io.NotSerializableException; -import java.io.Serializable; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -89,7 +87,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; * {@link PipelineOptions#as(Class)}. */ @ThreadSafe -class ProxyInvocationHandler implements InvocationHandler, Serializable { +class ProxyInvocationHandler implements InvocationHandler { /** * No two instances of this class are considered equivalent hence we generate a random hash code. */ @@ -166,21 +164,6 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable { + Arrays.toString(args) + "]."); } - public String getOptionName(Method method) { - return gettersToPropertyNames.get(method.getName()); - } - - private void writeObject(java.io.ObjectOutputStream stream) - throws IOException { - throw new NotSerializableException( - "PipelineOptions objects are not serializable and should not be embedded into transforms " - + "(did you capture a PipelineOptions object in a field or in an anonymous class?). " - + "Instead, if you're using a DoFn, access PipelineOptions at runtime " - + "via ProcessContext/StartBundleContext/FinishBundleContext.getPipelineOptions(), " - + "or pre-extract necessary fields from PipelineOptions " - + "at pipeline construction time."); - } - /** * Track whether options values are explicitly set, or retrieved from defaults. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index d8ff59e..2f0e8ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -24,23 +24,21 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -70,7 +68,7 @@ public class TransformHierarchy { producers = new HashMap<>(); producerInput = new HashMap<>(); unexpandedInputs = new HashMap<>(); - root = new Node(); + root = new Node(null, null, "", null); current = root; } @@ -145,6 +143,14 @@ public class TransformHierarchy { Node producerNode = getProducer(inputValue); PInput input = producerInput.remove(inputValue); inputValue.finishSpecifying(input, producerNode.getTransform()); + checkState( + producers.get(inputValue) != null, + "Producer unknown for input %s", + inputValue); + checkState( + producers.get(inputValue) != null, + "Producer unknown for input %s", + inputValue); } } @@ -159,7 +165,7 @@ public class TransformHierarchy { * nodes. */ public void setOutput(POutput output) { - for (PCollection<?> value : fullyExpand(output).values()) { + for (PValue value : output.expand().values()) { if (!producers.containsKey(value)) { producers.put(value, current); value.finishSpecifyingOutput( @@ -193,13 +199,13 @@ public class TransformHierarchy { } Node getProducer(PValue produced) { - return checkNotNull(producers.get(produced), "No producer found for %s", produced); + return producers.get(produced); } public Set<PValue> visit(PipelineVisitor visitor) { finishSpecifying(); Set<PValue> visitedValues = new HashSet<>(); - root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>()); + root.visit(visitor, visitedValues); return visitedValues; } @@ -220,47 +226,6 @@ public class TransformHierarchy { return current; } - private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) { - Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>(); - for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) { - if (value.getValue() instanceof PCollection) { - PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue()); - checkArgument( - previous == null, - "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", - output, - TupleTag.class.getSimpleName(), - value.getKey(), - previous, - value.getValue()); - } else { - if (value.getValue().expand().size() == 1 - && Iterables.getOnlyElement(value.getValue().expand().values()) - .equals(value.getValue())) { - throw new IllegalStateException( - String.format( - "Non %s %s that expands into itself %s", - PCollection.class.getSimpleName(), - PValue.class.getSimpleName(), - value.getValue())); - } - for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent : - fullyExpand(value.getValue()).entrySet()) { - PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue()); - checkArgument( - previous == null, - "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", - output, - TupleTag.class.getSimpleName(), - valueComponent.getKey(), - previous, - valueComponent.getValue()); - } - } - } - return result; - } - /** * Provides internal tracking of transform relationships with helper methods * for initialization and ordered visitation. @@ -288,36 +253,25 @@ public class TransformHierarchy { boolean finishedSpecifying = false; /** - * Creates the root-level node. The root level node has a null enclosing node, a null transform, - * an empty map of inputs, and a name equal to the empty string. - */ - private Node() { - this.enclosingNode = null; - this.transform = null; - this.fullName = ""; - this.inputs = Collections.emptyMap(); - } - - /** * Creates a new Node with the given parent and transform. * + * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other + * nodes. + * * @param enclosingNode the composite node containing this node * @param transform the PTransform tracked by this node * @param fullName the fully qualified name of the transform * @param input the unexpanded input to the transform */ private Node( - Node enclosingNode, - PTransform<?, ?> transform, + @Nullable Node enclosingNode, + @Nullable PTransform<?, ?> transform, String fullName, - PInput input) { + @Nullable PInput input) { this.enclosingNode = enclosingNode; this.transform = transform; this.fullName = fullName; - ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder(); - inputs.putAll(input.expand()); - inputs.putAll(transform.getAdditionalInputs()); - this.inputs = inputs.build(); + this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand(); } /** @@ -398,7 +352,7 @@ public class TransformHierarchy { return fullName; } - /** Returns the transform input, in fully expanded form. */ + /** Returns the transform input, in unexpanded form. */ public Map<TupleTag<?>, PValue> getInputs() { return inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs; } @@ -505,60 +459,10 @@ public class TransformHierarchy { /** * Visit the transform node. * - * <p>The visit proceeds in the following order: - * - * <ul> - * <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link - * Node#getInputs()}. - * <li>If the node is a composite: - * <ul> - * <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}. - * <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link - * CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}. - * <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}. - * </ul> - * <li>If the node is a primitive, visit it via {@link - * PipelineVisitor#visitPrimitiveTransform(Node)}. - * <li>Visit each {@link PValue} that was output by this node. - * </ul> - * - * <p>Additionally, the following ordering restrictions are observed: - * - * <ul> - * <li>A {@link Node} will be visited after its enclosing node has been entered and before its - * enclosing node has been left - * <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link - * CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link - * PipelineVisitor#enterCompositeTransform(Node)}. - * <li>A {@link PValue} will only be visited after the {@link Node} that originally produced - * it has been visited. - * </ul> - * * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for * composite transforms), then the output values. */ - private void visit( - PipelineVisitor visitor, - Set<PValue> visitedValues, - Set<Node> visitedNodes, - Set<Node> skippedComposites) { - if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) { - // Recursively enter all enclosing nodes, as appropriate. - getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites); - } - // These checks occur after visiting the enclosing node to ensure that if this node has been - // visited while visiting the enclosing node the node is not revisited, or, if an enclosing - // Node is skipped, this node is also skipped. - if (!visitedNodes.add(this)) { - LOG.debug("Not revisiting previously visited node {}", this); - return; - } else if (childNodeOf(skippedComposites)) { - // This node is a child of a node that has been passed over via CompositeBehavior, and - // should also be skipped. All child nodes of a skipped composite should always be skipped. - LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this); - return; - } - + private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) { if (!finishedSpecifying) { finishSpecifying(); } @@ -566,31 +470,22 @@ public class TransformHierarchy { if (!isRootNode()) { // Visit inputs. for (PValue inputValue : inputs.values()) { - Node valueProducer = getProducer(inputValue); - if (!visitedNodes.contains(valueProducer)) { - valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites); - } if (visitedValues.add(inputValue)) { - LOG.debug("Visiting input value {}", inputValue); - visitor.visitValue(inputValue, valueProducer); + visitor.visitValue(inputValue, getProducer(inputValue)); } } } if (isCompositeNode()) { - LOG.debug("Visiting composite node {}", this); PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this); if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) { for (Node child : parts) { - child.visit(visitor, visitedValues, visitedNodes, skippedComposites); + child.visit(visitor, visitedValues); } - } else { - skippedComposites.add(this); } visitor.leaveCompositeTransform(this); } else { - LOG.debug("Visiting primitive node {}", this); visitor.visitPrimitiveTransform(this); } @@ -599,24 +494,12 @@ public class TransformHierarchy { // Visit outputs. for (PValue pValue : outputs.values()) { if (visitedValues.add(pValue)) { - LOG.debug("Visiting output value {}", pValue); visitor.visitValue(pValue, this); } } } } - private boolean childNodeOf(Set<Node> nodes) { - if (isRootNode()) { - return false; - } - Node parent = this.getEnclosingNode(); - while (!parent.isRootNode() && !nodes.contains(parent)) { - parent = parent.getEnclosingNode(); - } - return nodes.contains(parent); - } - /** * Finish specifying a transform. * http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java index eba6978..c11057a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java @@ -126,9 +126,4 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> { } }; } - - @Override - public boolean assignsToOneWindow() { - return true; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index d13fcf1..9ad8fd8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -271,18 +271,6 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { return events; } - /** - * <b>For internal use only. No backwards-compatibility guarantees.</b> - * - * <p>Builder a test stream directly from events. No validation is performed on - * watermark monotonicity, etc. This is assumed to be a previously-serialized - * {@link TestStream} transform that is correct by construction. - */ - @Internal - public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) { - return new TestStream<>(coder, events); - } - @Override public boolean equals(Object other) { if (!(other instanceof TestStream)) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index d7effb5..9e1cc71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; @@ -1121,7 +1122,11 @@ public class Combine { */ @Override public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); + ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder(); + for (PCollectionView<?> sideInput : sideInputs) { + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); + } + return additionalInputs.build(); } /** @@ -1272,15 +1277,14 @@ public class Combine { public PCollectionView<OutputT> expand(PCollection<InputT> input) { PCollection<OutputT> combined = input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout)); - PCollectionView<OutputT> view = - PCollectionViews.singletonView( - combined, - input.getWindowingStrategy(), - insertDefault, - insertDefault ? fn.defaultValue() : null, - combined.getCoder()); - combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view)); - return view; + return combined.apply( + CreatePCollectionView.<OutputT, OutputT>of( + PCollectionViews.singletonView( + combined, + input.getWindowingStrategy(), + insertDefault, + insertDefault ? fn.defaultValue() : null, + combined.getCoder()))); } public int getFanout() { @@ -1573,7 +1577,11 @@ public class Combine { */ @Override public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); + ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder(); + for (PCollectionView<?> sideInput : sideInputs) { + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); + } + return additionalInputs.build(); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 1b809c2..e711ac2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import com.google.auto.value.AutoValue; import java.io.Serializable; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -386,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} { * * {@literal @StateId("my-state-id")} - * {@literal private final StateSpec<ValueState<MyState>>} myStateSpec = + * {@literal private final StateSpec<K, ValueState<MyState>>} myStateSpec = * StateSpecs.value(new MyStateCoder()); * * {@literal @ProcessElement} @@ -546,15 +545,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}. * <li>It <i>may</i> define a {@link GetRestrictionCoder} method. * <li>The type of restrictions used by all of these methods must be the same. - * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to - * indicate whether there is more work to be done for the current element. * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as * {@link BoundedWindow}. * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or * {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with - * either of these, it's assumed to be {@link BoundedPerElement} if its {@link - * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it - * returns a {@link ProcessContinuation}. + * either of these, it's assumed to be {@link BoundedPerElement}. * </ul> * * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods. @@ -682,49 +677,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD @Experimental(Kind.SPLITTABLE_DO_FN) public @interface UnboundedPerElement {} - // This can't be put into ProcessContinuation itself due to the following problem: - // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html - private static final ProcessContinuation PROCESS_CONTINUATION_STOP = - new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO); - - /** - * When used as a return value of {@link ProcessElement}, indicates whether there is more work to - * be done for the current element. - * - * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call - * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}. - */ - @Experimental(Kind.SPLITTABLE_DO_FN) - @AutoValue - public abstract static class ProcessContinuation { - /** Indicates that there is no more work to be done for the current element. */ - public static ProcessContinuation stop() { - return PROCESS_CONTINUATION_STOP; - } - - /** Indicates that there is more work to be done for the current element. */ - public static ProcessContinuation resume() { - return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO); - } - - /** - * If false, the {@link DoFn} promises that there is no more work remaining for the current - * element, so the runner should not resume the {@link ProcessElement} call. - */ - public abstract boolean shouldResume(); - - /** - * A minimum duration that should elapse between the end of this {@link ProcessElement} call and - * the {@link ProcessElement} call continuing processing of the same element. By default, zero. - */ - public abstract Duration resumeDelay(); - - /** Builder method to set the value of {@link #resumeDelay()}. */ - public ProcessContinuation withResumeDelay(Duration resumeDelay) { - return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay); - } - } - /** * Finalize the {@link DoFn} construction to prepare for processing. * This method should be called by runners before any processing methods. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b2377dd..8a03f3c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -290,11 +290,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( @@ -551,6 +546,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { fn.super(); } + private void throwUnsupportedOutputFromBundleMethods() { + throw new UnsupportedOperationException( + "DoFnTester doesn't support output from bundle methods"); + } + @Override public PipelineOptions getPipelineOptions() { return options; @@ -559,13 +559,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public void output( OutputT output, Instant timestamp, BoundedWindow window) { - output(mainOutputTag, output, timestamp, window); + throwUnsupportedOutputFromBundleMethods(); } @Override public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING)); + throwUnsupportedOutputFromBundleMethods(); } } @@ -643,6 +642,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } + + private void throwUnsupportedOutputFromBundleMethods() { + throw new UnsupportedOperationException( + "DoFnTester doesn't support output from bundle methods"); + } + } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 0d03835..edf1419 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -32,7 +33,6 @@ import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -456,27 +455,6 @@ public class ParDo { } } - private static void validateStateApplicableForInput( - DoFn<?, ?> fn, - PCollection<?> input) { - Coder<?> inputCoder = input.getCoder(); - checkArgument( - inputCoder instanceof KvCoder, - "%s requires its input to use %s in order to use state and timers.", - ParDo.class.getSimpleName(), - KvCoder.class.getSimpleName()); - - KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder; - try { - kvCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException exc) { - throw new IllegalArgumentException( - String.format( - "%s requires a deterministic key coder in order to use state and timers", - ParDo.class.getSimpleName())); - } - } - /** * Try to provide coders for as many of the type arguments of given * {@link DoFnSignature.StateDeclaration} as possible. @@ -684,7 +662,11 @@ public class ParDo { */ @Override public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); + ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder(); + for (PCollectionView<?> sideInput : sideInputs) { + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); + } + return additionalInputs.build(); } } @@ -759,11 +741,6 @@ public class ParDo { // Use coder registry to determine coders for all StateSpec defined in the fn signature. finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder()); - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - if (signature.usesState() || signature.usesTimers()) { - validateStateApplicableForInput(fn, input); - } - PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), @@ -830,7 +807,11 @@ public class ParDo { */ @Override public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); + ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder(); + for (PCollectionView<?> sideInput : sideInputs) { + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); + } + return additionalInputs.build(); } }
