Repository: beam Updated Branches: refs/heads/master 8e5cfdea9 -> bc907c58b
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index 948a65b..16f3eb6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.List; @@ -30,7 +29,9 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Sink.WriteOperation; import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.PipelineOptions; @@ -42,7 +43,9 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -81,9 +84,19 @@ import org.slf4j.LoggerFactory; public class Write<T> extends PTransform<PCollection<T>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(Write.class); + private static final int UNKNOWN_SHARDNUM = -1; + private static final int UNKNOWN_NUMSHARDS = -1; + private final Sink<T> sink; + // This allows the number of shards to be dynamically computed based on the input + // PCollection. @Nullable private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; + // We don't use a side input for static sharding, as we want this value to be updatable + // when a pipeline is updated. + @Nullable + private final ValueProvider<Integer> numShardsProvider; + private boolean windowedWrites; /** * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner @@ -91,21 +104,24 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { */ public static <T> Write<T> to(Sink<T> sink) { checkNotNull(sink, "sink"); - return new Write<>(sink, null /* runner-determined sharding */); + return new Write<>(sink, null /* runner-determined sharding */, null, false); } private Write( Sink<T> sink, - @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) { + @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, + @Nullable ValueProvider<Integer> numShardsProvider, + boolean windowedWrites) { this.sink = sink; this.computeNumShards = computeNumShards; + this.numShardsProvider = numShardsProvider; + this.windowedWrites = windowedWrites; } @Override public PDone expand(PCollection<T> input) { - checkArgument( - IsBounded.BOUNDED == input.isBounded(), - "%s can only be applied to a Bounded PCollection", + checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, + "%s can only be applied to an unbounded PCollection if doing windowed writes", Write.class.getSimpleName()); PipelineOptions options = input.getPipeline().getOptions(); sink.validate(options); @@ -120,6 +136,11 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { .include("sink", sink); if (getSharding() != null) { builder.include("sharding", getSharding()); + } else if (getNumShards() != null) { + String numShards = getNumShards().isAccessible() + ? getNumShards().get().toString() : getNumShards().toString(); + builder.add(DisplayData.item("numShards", numShards) + .withLabel("Fixed Number of Shards")); } } @@ -141,6 +162,10 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { return computeNumShards; } + public ValueProvider<Integer> getNumShards() { + return numShardsProvider; + } + /** * Returns a new {@link Write} that will write to the current {@link Sink} using the * specified number of shards. @@ -165,8 +190,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for * more information. */ - public Write<T> withNumShards(ValueProvider<Integer> numShards) { - return new Write<>(sink, new ConstantShards<T>(numShards)); + public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) { + return new Write<>(sink, null, numShardsProvider, windowedWrites); } /** @@ -179,7 +204,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { checkNotNull( sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new Write<>(sink, sharding); + return new Write<>(sink, sharding, null, windowedWrites); } /** @@ -187,7 +212,25 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { * runner-determined sharding. */ public Write<T> withRunnerDeterminedSharding() { - return new Write<>(sink, null); + return new Write<>(sink, null, null, windowedWrites); + } + + /** + * Returns a new {@link Write} that writes preserves windowing on it's input. + * + * <p>If this option is not specified, windowing and triggering are replaced by + * {@link GlobalWindows} and {@link DefaultTrigger}. + * + * <p>If there is no data for a window, no output shards will be generated for that window. + * If a window triggers multiple times, then more than a single output shard might be + * generated multiple times; it's up to the sink implementation to keep these output shards + * unique. + * + * <p>This option can only be used if {@link #withNumShards(int)} is also set to a + * positive value. + */ + public Write<T> withWindowedWrites() { + return new Write<>(sink, computeNumShards, numShardsProvider, true); } /** @@ -205,13 +248,19 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { // Lazily initialize the Writer if (writer == null) { WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); LOG.info("Opening writer for write operation {}", writeOperation); writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.open(UUID.randomUUID().toString()); + + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); } try { @@ -257,42 +306,57 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { */ private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; + private final PCollectionView<Integer> numShardsView; - WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { + WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView, + PCollectionView<Integer> numShardsView) { this.writeOperationView = writeOperationView; + this.numShardsView = numShardsView; } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); // In a sharded write, single input element represents one shard. We can open and close // the writer in each call to processElement. WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); LOG.info("Opening writer for write operation {}", writeOperation); Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.open(UUID.randomUUID().toString()); + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), + numShards); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); try { - for (T t : c.element().getValue()) { - writer.write(t); - } - } catch (Exception e) { try { - writer.close(); - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); + for (T t : c.element().getValue()) { + writer.write(t); } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; } + + // Close the writer; if this throws let the error propagate. + WriteT result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); throw e; } - - // Close the writer; if this throws let the error propagate. - WriteT result = writer.close(); - c.output(result); } @Override @@ -302,23 +366,32 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { } private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { - private final PCollectionView<Integer> numShards; + private final PCollectionView<Integer> numShardsView; + private final ValueProvider<Integer> numShardsProvider; private int shardNumber; - ApplyShardingKey(PCollectionView<Integer> numShards) { - this.numShards = numShards; - shardNumber = -1; + ApplyShardingKey(PCollectionView<Integer> numShardsView, + ValueProvider<Integer> numShardsProvider) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + shardNumber = UNKNOWN_SHARDNUM; } @ProcessElement public void processElement(ProcessContext context) { - Integer shardCount = context.sideInput(numShards); + int shardCount = 0; + if (numShardsView != null) { + shardCount = context.sideInput(numShardsView); + } else { + checkNotNull(numShardsProvider); + shardCount = numShardsProvider.get(); + } checkArgument( shardCount > 0, "Must have a positive number of shards specified for non-runner-determined sharding." + " Got %s", shardCount); - if (shardNumber == -1) { + if (shardNumber == UNKNOWN_SHARDNUM) { // We want to desynchronize the first record sharding key for each instance of // ApplyShardingKey, so records in a small PCollection will be statistically balanced. shardNumber = ThreadLocalRandom.current().nextInt(shardCount); @@ -340,8 +413,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { * <p>This singleton collection containing the WriteOperation is then used as a side input to a * ParDo over the PCollection of elements to write. In this bundle-writing phase, * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and - * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for + * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and + * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for * every element in the bundle. The output of this ParDo is a PCollection of * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for * each bundle. @@ -364,6 +437,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { private <WriteT> PDone createWrite( PCollection<T> input, WriteOperation<T, WriteT> writeOperation) { Pipeline p = input.getPipeline(); + writeOperation.setWindowedWrites(windowedWrites); // A coder to use for the WriteOperation. @SuppressWarnings("unchecked") @@ -373,7 +447,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize // the sink. PCollection<WriteOperation<T, WriteT>> operationCollection = - p.apply(Create.of(writeOperation).withCoder(operationCoder)); + p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder)); // Initialize the resource in a do-once ParDo on the WriteOperation. operationCollection = operationCollection @@ -384,6 +458,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { WriteOperation<T, WriteT> writeOperation = c.element(); LOG.info("Initializing write operation {}", writeOperation); writeOperation.initialize(c.getPipelineOptions()); + writeOperation.setWindowedWrites(windowedWrites); LOG.debug("Done initializing write operation {}", writeOperation); // The WriteOperation is also the output of this ParDo, so it can have mutable // state. @@ -396,133 +471,133 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> { final PCollectionView<WriteOperation<T, WriteT>> writeOperationView = operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton()); - // Re-window the data into the global window and remove any existing triggers. - PCollection<T> inputInGlobalWindow = - input.apply( - Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); + if (!windowedWrites) { + // Re-window the data into the global window and remove any existing triggers. + input = + input.apply( + Window.<T>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + } + // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation // as a side input) and collect the results of the writes in a PCollection. // There is a dependency between this ParDo and the first (the WriteOperation PCollection // as a side input), so this will happen after the initial ParDo. PCollection<WriteT> results; - final PCollectionView<Integer> numShards; - if (computeNumShards == null) { - numShards = null; - results = - inputInGlobalWindow.apply( - "WriteBundles", + final PCollectionView<Integer> numShardsView; + if (computeNumShards == null && numShardsProvider == null) { + if (windowedWrites) { + throw new IllegalStateException("When doing windowed writes, numShards must be set" + + "explicitly to a positive value"); + } + numShardsView = null; + results = input + .apply("WriteBundles", ParDo.of(new WriteBundles<>(writeOperationView)) .withSideInputs(writeOperationView)); } else { - numShards = inputInGlobalWindow.apply(computeNumShards); - results = - inputInGlobalWindow - .apply( - "ApplyShardLabel", - ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards)) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) - .apply( - "WriteShardedBundles", - ParDo.of(new WriteShardedBundles<>(writeOperationView)) - .withSideInputs(writeOperationView)); + if (computeNumShards != null) { + numShardsView = input.apply(computeNumShards); + results = input + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView)) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView)) + .withSideInputs(numShardsView, writeOperationView)); + } else { + numShardsView = null; + results = input + .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider))) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, null)) + .withSideInputs(writeOperationView)); + } } results.setCoder(writeOperation.getWriterResultCoder()); - final PCollectionView<Iterable<WriteT>> resultsView = - results.apply(View.<WriteT>asIterable()); - - // Finalize the write in another do-once ParDo on the singleton collection containing the - // Writer. The results from the per-bundle writes are given as an Iterable side input. - // The WriteOperation's state is the same as after its initialization in the first do-once - // ParDo. There is a dependency between this ParDo and the parallel write (the writer results - // collection as a side input), so it will happen after the parallel write. - ImmutableList.Builder<PCollectionView<?>> sideInputs = - ImmutableList.<PCollectionView<?>>builder().add(resultsView); - if (numShards != null) { - sideInputs.add(numShards); - } - operationCollection - .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.element(); - LOG.info("Finalizing write operation {}.", writeOperation); - List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); - - // We must always output at least 1 shard, and honor user-specified numShards if set. - int minShardsNeeded; - if (numShards == null) { - minShardsNeeded = 1; - } else { - minShardsNeeded = c.sideInput(numShards); - checkArgument( - minShardsNeeded > 0, - "Must have a positive number of shards for non-runner-determined sharding." - + " Got %s", - minShardsNeeded); + if (windowedWrites) { + // When processing streaming windowed writes, results will arrive multiple times. This + // means we can't share the below implementation that turns the results into a side input, + // as new data arriving into a side input does not trigger the listening DoFn. Instead + // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered + // whenever new data arrives. + PCollection<KV<Void, WriteT>> keyedResults = + results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null)); + keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation + .getWriterResultCoder())); + + // Is the continuation trigger sufficient? + keyedResults + .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create()) + .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); + LOG.info("Finalizing write operation {}.", writeOperation); + List<WriteT> results = Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written for a total of {}.", - extraShardsNeeded, results.size(), minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.open(UUID.randomUUID().toString()); - WriteT emptyWrite = writer.close(); - results.add(emptyWrite); + }).withSideInputs(writeOperationView)); + } else { + final PCollectionView<Iterable<WriteT>> resultsView = + results.apply(View.<WriteT>asIterable()); + ImmutableList.Builder<PCollectionView<?>> sideInputs = + ImmutableList.<PCollectionView<?>>builder().add(resultsView); + if (numShardsView != null) { + sideInputs.add(numShardsView); + } + + // Finalize the write in another do-once ParDo on the singleton collection containing the + // Writer. The results from the per-bundle writes are given as an Iterable side input. + // The WriteOperation's state is the same as after its initialization in the first do-once + // ParDo. There is a dependency between this ParDo and the parallel write (the writer + // results collection as a side input), so it will happen after the parallel write. + // For the non-windowed case, we guarantee that if no data is written but the user has + // set numShards, then all shards will be written out as empty files. For this reason we + // use a side input here. + operationCollection + .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation<T, WriteT> writeOperation = c.element(); + LOG.info("Finalizing write operation {}.", writeOperation); + List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; } - LOG.debug("Done creating extra shards."); + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of " + + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + WriteT emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); } - - writeOperation.finalize(results, c.getPipelineOptions()); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(sideInputs.build())); - return PDone.in(input.getPipeline()); - } - - @VisibleForTesting - static class ConstantShards<T> - extends PTransform<PCollection<T>, PCollectionView<Integer>> { - private final ValueProvider<Integer> numShards; - - private ConstantShards(ValueProvider<Integer> numShards) { - this.numShards = numShards; - } - - @Override - public PCollectionView<Integer> expand(PCollection<T> input) { - return input - .getPipeline() - .apply(Create.of(0)) - .apply( - "FixedNumShards", - ParDo.of( - new DoFn<Integer, Integer>() { - @ProcessElement - public void outputNumShards(ProcessContext ctxt) { - checkArgument( - numShards.isAccessible(), - "NumShards must be accessible at runtime to use constant sharding"); - ctxt.output(numShards.get()); - } - })) - .apply(View.<Integer>asSingleton()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add( - DisplayData.item("Fixed Number of Shards", numShards).withLabel("ConstantShards")); - } - - public ValueProvider<Integer> getNumShards() { - return numShards; + }).withSideInputs(sideInputs.build())); } + return PDone.in(input.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java index 6937e93..2159c8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java @@ -176,7 +176,7 @@ public class XmlSink { * <p>The specified class must be able to be used to create a JAXB context. */ public <T> Bound<T> ofRecordClass(Class<T> classToBind) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); + return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get()); } /** @@ -194,7 +194,7 @@ public class XmlSink { * supplied name. */ public Bound<T> withRootElement(String rootElementName) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); + return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get()); } /** @@ -205,7 +205,7 @@ public class XmlSink { public void validate(PipelineOptions options) { checkNotNull(classToBind, "Missing a class to bind to a JAXB context."); checkNotNull(rootElementName, "Missing a root element name."); - checkNotNull(baseOutputFilename, "Missing a filename to write to."); + checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to."); try { JAXBContext.newInstance(classToBind); } catch (JAXBException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index 0739381..e1ad47b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.testing; +import com.fasterxml.jackson.annotation.JsonIgnore; + import javax.annotation.Nullable; + import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -35,10 +38,12 @@ public interface TestPipelineOptions extends PipelineOptions { void setTempRoot(String value); @Default.InstanceFactory(AlwaysPassMatcherFactory.class) + @JsonIgnore SerializableMatcher<PipelineResult> getOnCreateMatcher(); void setOnCreateMatcher(SerializableMatcher<PipelineResult> value); @Default.InstanceFactory(AlwaysPassMatcherFactory.class) + @JsonIgnore SerializableMatcher<PipelineResult> getOnSuccessMatcher(); void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index dd81a34..6f6ba37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -45,6 +46,7 @@ import java.util.LinkedList; import java.util.List; import java.util.regex.Matcher; import javax.annotation.Nullable; + import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,17 +177,20 @@ public class FileIOChannelFactory implements IOChannelFactory { } @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws + IOException { + List<String> srcList = Lists.newArrayList(srcFilenames); + List<String> destList = Lists.newArrayList(destFilenames); checkArgument( - srcFilenames.size() == destFilenames.size(), + srcList.size() == destList.size(), "Number of source files %s must equal number of destination files %s", - srcFilenames.size(), - destFilenames.size()); - int numFiles = srcFilenames.size(); + srcList.size(), + destList.size()); + int numFiles = srcList.size(); for (int i = 0; i < numFiles; i++) { - String src = srcFilenames.get(i); - String dst = destFilenames.get(i); - LOG.debug("Copying {} to {}", src, dst); + String src = srcList.get(i); + String dst = destList.get(i); + LOG.info("Copying {} to {}", src, dst); try { // Copy the source file, replacing the existing destination. // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter. @@ -194,7 +199,7 @@ public class FileIOChannelFactory implements IOChannelFactory { new File(dst).toPath(), StandardCopyOption.REPLACE_EXISTING); } catch (NoSuchFileException e) { - LOG.debug("{} does not exist.", src); + LOG.info("{} does not exist.", src); // Suppress exception if file does not exist. } } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index 9f99cd6..745dcb9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -99,7 +99,8 @@ public class GcsIOChannelFactory implements IOChannelFactory { } @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { options.getGcsUtil().copy(srcFilenames, destFilenames); } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 14781c4..1c853bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -635,23 +636,27 @@ public class GcsUtil { return batches; } - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + public void copy(Iterable<String> srcFilenames, + Iterable<String> destFilenames) throws + IOException { executeBatches(makeCopyBatches(srcFilenames, destFilenames)); } - List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames) + List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException { + List<String> srcList = Lists.newArrayList(srcFilenames); + List<String> destList = Lists.newArrayList(destFilenames); checkArgument( - srcFilenames.size() == destFilenames.size(), + srcList.size() == destList.size(), "Number of source files %s must equal number of destination files %s", - srcFilenames.size(), - destFilenames.size()); + srcList.size(), + destList.size()); List<BatchRequest> batches = new LinkedList<>(); BatchRequest batch = createBatchRequest(); - for (int i = 0; i < srcFilenames.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); - final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); + for (int i = 0; i < srcList.size(); i++) { + final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); + final GcsPath destPath = GcsPath.fromUri(destList.get(i)); enqueueCopy(sourcePath, destPath, batch); if (batch.size() >= MAX_REQUESTS_PER_BATCH) { batches.add(batch); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java index 9504f45..3a3af17 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java @@ -23,7 +23,6 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; import java.util.Collection; -import java.util.List; /** * Defines a factory for working with read and write channels. @@ -116,7 +115,7 @@ public interface IOChannelFactory { * @param srcFilenames the source filenames. * @param destFilenames the destination filenames. */ - void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException; + void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException; /** * Removes a collection of files or directories. http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 19f5ffa..f3dbb05 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -31,13 +31,18 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; @@ -48,18 +53,29 @@ import org.apache.avro.reflect.Nullable; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -94,7 +110,7 @@ public class AvroIOTest { } @Test - public void testWriteWithoutValidationFlag() throws Exception { + public void testWriteWithoutValPuidationFlag() throws Exception { AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write.to("gs://bucket/foo/baz"); assertTrue(write.needsValidation()); assertFalse(write.withoutValidation().needsValidation()); @@ -275,6 +291,132 @@ public class AvroIOTest { p.run(); } + private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) { + return TimestampedValue.of(element, new Instant(0).plus(duration)); + } + + private static class WindowedFilenamePolicy extends FilenamePolicy { + String outputFilePrefix; + + WindowedFilenamePolicy(String outputFilePrefix) { + this.outputFilePrefix = outputFilePrefix; + } + + @Override + public ValueProvider<String> getBaseOutputFilenameProvider() { + return StaticValueProvider.of(outputFilePrefix); + } + + @Override + public String windowedFilename(WindowedContext input) { + String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-" + + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-" + + input.getPaneInfo().getIndex(); + if (input.getPaneInfo().isLast()) { + filename += "-final"; + } + return filename; + } + + @Override + public String unwindowedFilename(Context input) { + String filename = outputFilePrefix + input.getShardNumber() + "-of-" + + (input.getNumShards() - 1); + return filename; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) + .withLabel("File Name Prefix")); + } + } + + @Rule + public TestPipeline windowedAvroWritePipeline = TestPipeline.create(); + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class }) + public void testWindowedAvroIOWrite() throws Throwable { + File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); + final String outputFilePrefix = baseOutputFile.getAbsolutePath(); + + Instant base = new Instant(0); + ArrayList<GenericClass> allElements = new ArrayList<>(); + ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>(); + ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList( + base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)), + base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30))); + + Random random = new Random(); + for (int i = 0; i < 100; ++i) { + GenericClass item = new GenericClass(i, String.valueOf(i)); + allElements.add(item); + firstWindowElements.add(TimestampedValue.of(item, + firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size())))); + } + + ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>(); + ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList( + base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)), + base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90))); + for (int i = 100; i < 200; ++i) { + GenericClass item = new GenericClass(i, String.valueOf(i)); + allElements.add(new GenericClass(i, String.valueOf(i))); + secondWindowElements.add(TimestampedValue.of(item, + secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size())))); + } + + + TimestampedValue<GenericClass>[] firstWindowArray = + firstWindowElements.toArray(new TimestampedValue[100]); + TimestampedValue<GenericClass>[] secondWindowArray = + secondWindowElements.toArray(new TimestampedValue[100]); + + TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class)) + .advanceWatermarkTo(new Instant(0)) + .addElements(firstWindowArray[0], + Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1))) + .addElements(secondWindowArray[0], + Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) + .advanceWatermarkToInfinity(); + + windowedAvroWritePipeline + .apply(values) + .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) + .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix)) + .withWindowedWrites() + .withNumShards(2) + .withSchema(GenericClass.class)); + windowedAvroWritePipeline.run(); + + // Validate that the data written matches the expected elements in the expected order + List<File> expectedFiles = new ArrayList<>(); + for (int shard = 0; shard < 2; shard++) { + for (int window = 0; window < 2; window++) { + Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window)); + IntervalWindow intervalWindow = new IntervalWindow( + windowStart, Duration.standardMinutes(1)); + expectedFiles.add( + new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard + + "-of-1" + "-pane-0-final")); + } + } + + List<GenericClass> actualElements = new ArrayList<>(); + for (File outputFile : expectedFiles) { + assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists()); + try (DataFileReader<GenericClass> reader = + new DataFileReader<>(outputFile, AvroCoder.of( + GenericClass.class).createDatumReader())) { + Iterators.addAll(actualElements, reader); + } + outputFile.delete(); + } + assertThat(actualElements, containsInAnyOrder(allElements.toArray())); + } + @Test public void testWriteWithDefaultCodec() throws Exception { AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write @@ -347,8 +489,10 @@ public class AvroIOTest { Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class); if (numShards > 1) { + System.out.println("NumShards " + numShards); write = write.withNumShards(numShards); } else { + System.out.println("no sharding"); write = write.withoutSharding(); } p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index d2c1968..5b81ba8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,13 +17,13 @@ */ package org.apache.beam.sdk.io; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -41,13 +41,17 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -99,7 +103,7 @@ public class FileBasedSinkTest { expected.addAll(values); expected.add(SimpleSink.SimpleWriter.FOOTER); - writer.open(testUid); + writer.openUnwindowed(testUid, -1, -1); for (String value : values) { writer.write(value); } @@ -215,20 +219,18 @@ public class FileBasedSinkTest { int numFiles = temporaryFiles.size(); - List<File> outputFiles = new ArrayList<>(); List<FileResult> fileResults = new ArrayList<>(); - List<String> outputFilenames = writeOp.generateDestinationFilenames(numFiles); - - // Create temporary output bundles and output File objects + // Create temporary output bundles and output File objects. for (int i = 0; i < numFiles; i++) { - fileResults.add(new FileResult(temporaryFiles.get(i).toString())); - outputFiles.add(new File(outputFilenames.get(i))); + fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null)); } writeOp.finalize(fileResults, options); for (int i = 0; i < numFiles; i++) { - assertTrue(outputFiles.get(i).exists()); + String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename( + new Context(i, numFiles)); + assertTrue(new File(outputFilename).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -258,7 +260,7 @@ public class FileBasedSinkTest { outputFiles.add(outputFile); } - writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options); + writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options); for (int i = 0; i < numFiles; i++) { assertFalse(temporaryFiles.get(i).exists()); @@ -274,12 +276,12 @@ public class FileBasedSinkTest { PipelineOptions options = PipelineOptionsFactory.create(); SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); - List<String> inputFilenames = Arrays.asList("input-3", "input-2", "input-1"); - List<String> inputContents = Arrays.asList("3", "2", "1"); + List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); + List<String> inputContents = Arrays.asList("1", "2", "3"); List<String> expectedOutputFilenames = Arrays.asList( - "output-00002-of-00003.test", "output-00001-of-00003.test", "output-00000-of-00003.test"); + "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test"); - List<String> inputFilePaths = new ArrayList<>(); + Map<String, String> inputFilePaths = new HashMap<>(); List<String> expectedOutputPaths = new ArrayList<>(); for (int i = 0; i < inputFilenames.size(); i++) { @@ -291,14 +293,13 @@ public class FileBasedSinkTest { File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i)); List<String> lines = Arrays.asList(inputContents.get(i)); writeFile(lines, inputTmpFile); - inputFilePaths.add(inputTmpFile.toString()); + inputFilePaths.put(inputTmpFile.toString(), + writeOp.getSink().getFileNamePolicy().unwindowedFilename( + new Context(i, inputFilenames.size()))); } // Copy input files to output files. - List<String> actual = writeOp.copyToOutputFiles(inputFilePaths, options); - - // Assert that the expected paths are returned. - assertThat(expectedOutputPaths, containsInAnyOrder(actual.toArray())); + writeOp.copyToOutputFiles(inputFilePaths, options); // Assert that the contents were copied. for (int i = 0; i < expectedOutputPaths.size(); i++) { @@ -306,6 +307,14 @@ public class FileBasedSinkTest { } } + public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) { + List<String> filenames = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + filenames.add(policy.unwindowedFilename(new Context(i, numFiles))); + } + return filenames; + } + /** * Output filenames use the supplied naming template. */ @@ -314,36 +323,35 @@ public class FileBasedSinkTest { List<String> expected; List<String> actual; SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN"); - SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); + FilenamePolicy policy = sink.getFileNamePolicy(); expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"), appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test")); - actual = writeOp.generateDestinationFilenames(3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Arrays.asList(appendToTempFolder("output.00.of.01.test")); - actual = writeOp.generateDestinationFilenames(1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = writeOp.generateDestinationFilenames(0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); // Also validate that we handle the case where the user specified "." that we do // not prefix an additional "." making "..test" sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN"); - writeOp = new SimpleSink.SimpleWriteOperation(sink); expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"), appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test")); - actual = writeOp.generateDestinationFilenames(3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Arrays.asList(appendToTempFolder("output.00.of.01.test")); - actual = writeOp.generateDestinationFilenames(1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = writeOp.generateDestinationFilenames(0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); } @@ -355,20 +363,21 @@ public class FileBasedSinkTest { List<String> expected; List<String> actual; SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); + FilenamePolicy policy = writeOp.getSink().getFileNamePolicy(); expected = Arrays.asList( appendToTempFolder("output-00000-of-00003.test"), appendToTempFolder("output-00001-of-00003.test"), appendToTempFolder("output-00002-of-00003.test")); - actual = writeOp.generateDestinationFilenames(3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test")); - actual = writeOp.generateDestinationFilenames(1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = writeOp.generateDestinationFilenames(0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); } @@ -380,16 +389,17 @@ public class FileBasedSinkTest { SimpleSink sink = new SimpleSink("output", "test", "-NN"); SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); - // A single shard doesn't need to include the shard number. - assertEquals(Arrays.asList("output-01.test"), - writeOp.generateDestinationFilenames(1)); - // More than one shard does. try { - writeOp.generateDestinationFilenames(3); + Iterable<FileResult> results = Lists.newArrayList( + new FileResult("temp1", "file1"), + new FileResult("temp2", "file1"), + new FileResult("temp3", "file1")); + + writeOp.buildOutputFilenames(results); fail("Should have failed."); } catch (IllegalStateException exn) { - assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.", + assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage()); } } @@ -402,19 +412,19 @@ public class FileBasedSinkTest { List<String> expected; List<String> actual; SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), ""); - SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); + FilenamePolicy policy = sink.getFileNamePolicy(); expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"), appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003")); - actual = writeOp.generateDestinationFilenames(3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Arrays.asList(appendToTempFolder("output-00000-of-00001")); - actual = writeOp.generateDestinationFilenames(1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = writeOp.generateDestinationFilenames(0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); } @@ -513,7 +523,7 @@ public class FileBasedSinkTest { expected.add("footer"); expected.add("footer"); - writer.open(testUid); + writer.openUnwindowed(testUid, -1, -1); writer.write("a"); writer.write("b"); final FileResult result = writer.close(); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 3ecbed4..16d7f2a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -49,10 +49,10 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Sink.WriteOperation; import org.apache.beam.sdk.io.Sink.Writer; -import org.apache.beam.sdk.io.Write.ConstantShards; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -63,11 +63,12 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -311,8 +312,10 @@ public class WriteTest { assertThat(write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = write.getSharding(); - assertThat(write.getSharding(), instanceOf(ConstantShards.class)); - assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3)); + + assertThat(write.getSharding(), is(nullValue())); + assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class)); + assertThat(write.getNumShards().get(), equalTo(3)); assertThat(write.getSharding(), equalTo(originalSharding)); Write<String> write2 = write.withSharding(SHARDING_TRANSFORM); @@ -352,7 +355,7 @@ public class WriteTest { DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); assertThat(displayData, includesDisplayDataFor("sink", sink)); - assertThat(displayData, hasDisplayItem("Fixed Number of Shards", 1)); + assertThat(displayData, hasDisplayItem("numShards", "1")); } @Test @@ -383,17 +386,6 @@ public class WriteTest { assertThat(displayData, hasDisplayItem("spam", "ham")); } - @Test - public void testWriteUnbounded() { - PCollection<String> unbounded = p.apply(CountingInput.unbounded()) - .apply(ToString.elements()); - - TestSink sink = new TestSink(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Write can only be applied to a Bounded PCollection"); - unbounded.apply(Write.to(sink)); - } - /** * Performs a Write transform and verifies the Write transform calls the appropriate methods on * a test sink in the correct order, as well as verifies that the elements of a PCollection are @@ -535,6 +527,10 @@ public class WriteTest { } @Override + public void setWindowedWrites(boolean windowedWrites) { + } + + @Override public void finalize(Iterable<TestWriterResult> bundleResults, PipelineOptions options) throws Exception { assertEquals("test_value", options.as(WriteOptions.class).getTestFlag()); @@ -633,7 +629,21 @@ public class WriteTest { } @Override - public void open(String uId) throws Exception { + public final void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int nShards) throws Exception { + numShards.incrementAndGet(); + this.uId = uId; + assertEquals(State.INITIAL, state); + state = State.OPENED; + } + + @Override + public final void openUnwindowed(String uId, + int shard, + int nShards) throws Exception { numShards.incrementAndGet(); this.uId = uId; assertEquals(State.INITIAL, state); @@ -653,8 +663,13 @@ public class WriteTest { state = State.CLOSED; return new TestWriterResult(uId, elementsWritten); } + + @Override + public void cleanup() throws Exception { + } } + /** * Options for test, exposed for PipelineOptionsFactory. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index 96b8c57..63b5d11 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -93,7 +93,7 @@ public class XmlSinkTest { .withRootElement(testRootElement); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename.get()); + assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get()); } /** @@ -105,7 +105,7 @@ public class XmlSinkTest { XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename.get()); + assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get()); } /** @@ -142,9 +142,9 @@ public class XmlSinkTest { XmlSink.writeOf(testClass, testRootElement, testFilePrefix); XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options); assertEquals(testClass, writeOp.getSink().classToBind); - assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get()); + assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get()); assertEquals(testRootElement, writeOp.getSink().rootElementName); - assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension); + // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension); Path outputPath = new File(testFilePrefix).toPath(); Path tempPath = new File(writeOp.tempDirectory.get()).toPath(); assertEquals(outputPath.getParent(), tempPath.getParent()); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 084d303..2ddead7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -162,23 +162,6 @@ public class TestPipelineTest implements Serializable { } @Test - public void testMatcherSerializationDeserialization() { - TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class); - SerializableMatcher<PipelineResult> m1 = new TestMatcher(); - SerializableMatcher<PipelineResult> m2 = new TestMatcher(); - - opts.setOnCreateMatcher(m1); - opts.setOnSuccessMatcher(m2); - - String[] arr = TestPipeline.convertToArgs(opts); - TestPipelineOptions newOpts = - PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class); - - assertEquals(m1, newOpts.getOnCreateMatcher()); - assertEquals(m2, newOpts.getOnSuccessMatcher()); - } - - @Test public void testRunWithDummyEnvironmentVariableFails() { System.getProperties() .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java index 9b085ca..10ff788 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -284,6 +286,10 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> { } @Override + public void setWindowedWrites(boolean windowedWrites) { + } + + @Override public void finalize(final Iterable<String> writerResults, PipelineOptions options) throws Exception { UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() { @@ -298,7 +304,6 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> { private void doFinalize(Iterable<String> writerResults) throws Exception { Job job = sink.newJob(); FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration()); - // If there are 0 output shards, just create output folder. if (!writerResults.iterator().hasNext()) { fs.mkdirs(new Path(path)); @@ -389,7 +394,17 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> { } @Override - public void open(final String uId) throws Exception { + public void openWindowed(final String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + throw new UnsupportedOperationException("Windowing support not implemented yet for" + + "HDFS. Window " + window); + } + + @Override + public void openUnwindowed(final String uId, int shard, int numShards) throws Exception { UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( new PrivilegedExceptionAction<Void>() { @Override @@ -427,6 +442,11 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> { } @Override + public void cleanup() throws Exception { + + } + + @Override public String close() throws Exception { return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( new PrivilegedExceptionAction<String>() { http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java index 8b9a6d1..cedd812 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java @@ -66,7 +66,7 @@ public class HDFSFileSinkTest { Sink.WriteOperation<T, String> writeOperation = (Sink.WriteOperation<T, String>) sink.createWriteOperation(options); Sink.Writer<T, String> writer = writeOperation.createWriter(options); - writer.open(UUID.randomUUID().toString()); + writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1); for (T t: toWrite) { writer.write(t); }
