This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit d314339ed2f8d5ce385c7b40705ef13f6ea43b45 Author: Eugene Kirpichov <[email protected]> AuthorDate: Thu Nov 30 13:05:00 2017 -0800 Reintroduces dynamic sharding with windowed writes for bounded collections --- .../apache/beam/examples/WindowedWordCount.java | 5 ++-- .../examples/common/WriteOneFilePerWindow.java | 12 ++++++---- .../apache/beam/examples/WindowedWordCountIT.java | 8 +++++++ .../beam/runners/apex/examples/WordCountTest.java | 2 +- .../construction/WriteFilesTranslationTest.java | 1 + .../beam/runners/spark/io/AvroPipelineTest.java | 5 ++-- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 27 +++------------------- .../java/org/apache/beam/sdk/io/WriteFiles.java | 27 +++++++++++----------- .../org/apache/beam/sdk/io/WriteFilesTest.java | 5 ++-- 9 files changed, 41 insertions(+), 51 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 21cfed8..b31ce4a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -162,9 +162,8 @@ public class WindowedWordCount { void setMaxTimestampMillis(Long value); @Description("Fixed number of shards to produce per window") - @Default.Integer(3) - int getNumShards(); - void setNumShards(int numShards); + Integer getNumShards(); + void setNumShards(Integer numShards); } public static void main(String[] args) throws IOException { diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index a5c84f6..abd14b7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -19,6 +19,7 @@ package org.apache.beam.examples.common; import static com.google.common.base.MoreObjects.firstNonNull; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; @@ -45,9 +46,10 @@ import org.joda.time.format.ISODateTimeFormat; public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> { private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute(); private String filenamePrefix; - private int numShards; + @Nullable + private Integer numShards; - public WriteOneFilePerWindow(String filenamePrefix, int numShards) { + public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) { this.filenamePrefix = filenamePrefix; this.numShards = numShards; } @@ -59,8 +61,10 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone TextIO.write() .to(new PerWindowFiles(resource)) .withTempDirectory(resource.getCurrentDirectory()) - .withWindowedWrites() - .withNumShards(numShards); + .withWindowedWrites(); + if (numShards != null) { + write = write.withNumShards(numShards); + } return input.apply(write); } diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 279de53..2f4ef34 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -87,6 +87,14 @@ public class WindowedWordCountIT { } @Test + public void testWindowedWordCountInBatchDynamicSharding() throws Exception { + WindowedWordCountITOptions options = batchOptions(); + // This is the default value, but make it explicit. + options.setNumShards(null); + testWindowedWordCountPipeline(options); + } + + @Test public void testWindowedWordCountInBatchStaticSharding() throws Exception { WindowedWordCountITOptions options = batchOptions(); options.setNumShards(3); diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index e050c15..ba75746 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -108,7 +108,7 @@ public class WordCountTest { .apply(ParDo.of(new ExtractWordsFn())) .apply(Count.<String>perElement()) .apply(ParDo.of(new FormatAsStringFn())) - .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2)) + .apply("WriteCounts", TextIO.write().to(options.getOutput())) ; p.run().waitUntilFinish(); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 2d45681..038653d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -64,6 +64,7 @@ public class WriteFilesTranslationTest { public static Iterable<WriteFiles<Object, Void, Object>> data() { return ImmutableList.of( WriteFiles.to(new DummySink()), + WriteFiles.to(new DummySink()).withWindowedWrites(), WriteFiles.to(new DummySink()).withNumShards(17), WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index e17a6b8..fc65aac 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -74,8 +74,7 @@ public class AvroPipelineTest { AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); input.apply( AvroIO.writeGenericRecords(schema) - .to(outputFile.getAbsolutePath()) - .withoutSharding()); + .to(outputFile.getAbsolutePath())); pipeline.run(); List<GenericRecord> records = readGenericFile(); @@ -100,7 +99,7 @@ public class AvroPipelineTest { List<GenericRecord> records = Lists.newArrayList(); GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(); try (DataFileReader<GenericRecord> dataFileReader = - new DataFileReader<>(outputFile, genericDatumReader)) { + new DataFileReader<>(new File(outputFile + "-00000-of-00001"), genericDatumReader)) { for (GenericRecord record : dataFileReader) { records.add(record); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 48d7521..2e5d387 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import java.io.IOException; import java.io.InputStream; @@ -43,7 +42,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -634,28 +632,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> if (numShards != null) { resultsWithShardNumbers = Lists.newArrayList(completeResults); } else { - checkState( - !windowedWrites, - "When doing windowed writes, shards should have been assigned when writing"); - // Sort files for idempotence. Sort by temporary filename. - // Note that this codepath should not be used when processing triggered windows. In the - // case of triggers, the list of FileResult objects in the Finalize iterable is not - // deterministic, and might change over retries. This breaks the assumption below that - // sorting the FileResult objects provides idempotency. - List<FileResult<DestinationT>> sortedByTempFilename = - Ordering.from( - new Comparator<FileResult<DestinationT>>() { - @Override - public int compare( - FileResult<DestinationT> first, FileResult<DestinationT> second) { - String firstFilename = first.getTempFilename().toString(); - String secondFilename = second.getTempFilename().toString(); - return firstFilename.compareTo(secondFilename); - } - }) - .sortedCopy(completeResults); - for (int i = 0; i < sortedByTempFilename.size(); i++) { - resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i)); + int i = 0; + for (FileResult<DestinationT> res : completeResults) { + resultsWithShardNumbers.add(res.withShard(i++)); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 54f055d..499a194 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.base.Objects; @@ -42,8 +41,8 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -286,13 +285,12 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> getWindowedWrites(), "Must use windowed writes when applying %s to an unbounded PCollection", WriteFiles.class.getSimpleName()); - } - if (getWindowedWrites()) { // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 // and similar behavior in other runners. checkArgument( getComputeNumShards() != null || getNumShardsProvider() != null, - "When using windowed writes, must specify number of output shards explicitly", + "When applying %s to an unbounded PCollection, " + + "must specify number of output shards explicitly", WriteFiles.class.getSimpleName()); } this.writeOperation = getSink().createWriteOperation(); @@ -364,7 +362,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> } private class GatherResults<ResultT> - extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> { + extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> { private final Coder<ResultT> resultCoder; private GatherResults(Coder<ResultT> resultCoder) { @@ -372,7 +370,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> } @Override - public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) { + public PCollection<List<ResultT>> expand(PCollection<ResultT> input) { if (getWindowedWrites()) { // Reshuffle the results to make them stable against retries. // Use a single void key to maximize size of bundles for finalization. @@ -381,7 +379,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> .apply("Reshuffle", Reshuffle.<Void, ResultT>of()) .apply("Drop key", Values.<ResultT>create()) .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>())) - .setCoder(IterableCoder.of(resultCoder)); + .setCoder(ListCoder.of(resultCoder)) + // Reshuffle one more time to stabilize the contents of the bundle lists to finalize. + .apply(Reshuffle.<List<ResultT>>viaRandomKey()); } else { // Pass results via a side input rather than reshuffle, because we need to get an empty // iterable to finalize if there are no results. @@ -389,7 +389,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> .getPipeline() .apply( Reify.viewInGlobalWindow( - input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder))); + input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder))); } } } @@ -742,7 +742,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> private class FinalizeTempFileBundles extends PTransform< - PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> { + PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> { @Nullable private final PCollectionView<Integer> numShardsView; private final Coder<DestinationT> destinationCoder; @@ -754,7 +754,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> @Override public WriteFilesResult<DestinationT> expand( - PCollection<Iterable<FileResult<DestinationT>>> input) { + PCollection<List<FileResult<DestinationT>>> input) { List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs()); if (numShardsView != null) { @@ -772,7 +772,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> } private class FinalizeFn - extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> { + extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> { @ProcessElement public void process(ProcessContext c) throws Exception { getDynamicDestinations().setSideInputAccessorFromProcessContext(c); @@ -782,7 +782,6 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> } else if (getNumShardsProvider() != null) { fixedNumShards = getNumShardsProvider().get(); } else { - checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding"); fixedNumShards = null; } List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element()); @@ -821,7 +820,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> return resultsToFinalFilenames; } - private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> { + private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> { @Nullable private transient Multimap<BoundedWindow, T> bundles = null; @StartBundle diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index b68cbf9..da4e6da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -386,10 +386,11 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) - public void testWindowedWritesNeedSharding() { + public void testUnboundedWritesNeedSharding() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( - "When using windowed writes, must specify number of output shards explicitly"); + "When applying WriteFiles to an unbounded PCollection, " + + "must specify number of output shards explicitly"); SimpleSink<Void> sink = makeSimpleSink(); p.apply(Create.of("foo")) -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
