Add windowing support to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6addc95f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6addc95f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6addc95f Branch: refs/heads/master Commit: 6addc95f0300a2e03109d4ad7ee93727d0a3b7b2 Parents: 570d0e2 Author: Reuven Lax <[email protected]> Authored: Thu Mar 9 09:45:35 2017 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Wed Apr 5 08:57:21 2017 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 1 - .../apache/beam/examples/WindowedWordCount.java | 34 +- .../examples/common/WriteOneFilePerWindow.java | 91 ++++ .../examples/common/WriteWindowedFilesDoFn.java | 77 ---- .../beam/examples/WindowedWordCountIT.java | 41 +- .../core/construction/PTransformMatchers.java | 3 +- .../direct/WriteWithShardingFactory.java | 6 +- .../streaming/io/UnboundedFlinkSink.java | 20 +- .../beam/runners/flink/WriteSinkITCase.java | 23 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 157 +++++-- .../org/apache/beam/sdk/io/FileBasedSink.java | 429 +++++++++++++++---- .../main/java/org/apache/beam/sdk/io/Sink.java | 55 ++- .../java/org/apache/beam/sdk/io/TextIO.java | 98 ++++- .../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++------- .../java/org/apache/beam/sdk/io/XmlSink.java | 6 +- .../beam/sdk/testing/TestPipelineOptions.java | 5 + .../beam/sdk/util/FileIOChannelFactory.java | 23 +- .../beam/sdk/util/GcsIOChannelFactory.java | 3 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 21 +- .../apache/beam/sdk/util/IOChannelFactory.java | 3 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++- .../apache/beam/sdk/io/FileBasedSinkTest.java | 94 ++-- .../java/org/apache/beam/sdk/io/WriteTest.java | 49 ++- .../org/apache/beam/sdk/io/XmlSinkTest.java | 8 +- .../beam/sdk/testing/TestPipelineTest.java | 17 - .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 24 +- .../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 2 +- 27 files changed, 1295 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 2b18130..021a819 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -209,7 +209,6 @@ <configuration> <includes> <include>WordCountIT.java</include> - <include>WindowedWordCountIT.java</include> </includes> <parallel>all</parallel> <threadCount>4</threadCount> http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 5c19454..d88de54 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; import org.apache.beam.examples.common.ExampleOptions; -import org.apache.beam.examples.common.WriteWindowedFilesDoFn; +import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; @@ -31,11 +31,9 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -203,33 +201,13 @@ public class WindowedWordCount { PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords()); /** - * Concept #5: Customize the output format using windowing information - * - * <p>At this point, the data is organized by window. We're writing text files and and have no - * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get - * one output file per window. (if we had late data this key would not be unique) - * - * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will - * be automatically detected and populated with the window for the current element. - */ - PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow = - wordCounts.apply( - ParDo.of( - new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() { - @ProcessElement - public void processElement(ProcessContext context, IntervalWindow window) { - context.output(KV.of(window, context.element())); - } - })); - - /** - * Concept #6: Format the results and write to a sharded file partitioned by window, using a + * Concept #5: Format the results and write to a sharded file partitioned by window, using a * simple ParDo operation. Because there may be failures followed by retries, the * writes must be idempotent, but the details of writing to files is elided here. */ - keyedByWindow - .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create()) - .apply(ParDo.of(new WriteWindowedFilesDoFn(output))); + wordCounts + .apply(MapElements.via(new WordCount.FormatAsTextFn())) + .apply(new WriteOneFilePerWindow(output)); PipelineResult result = pipeline.run(); try { http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java new file mode 100644 index 0000000..2ed8a74 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.common; + +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +/** + * A {@link DoFn} that writes elements to files with names deterministically derived from the lower + * and upper bounds of their key (an {@link IntervalWindow}). + * + * <p>This is test utility code, not for end-users, so examples can be focused on their primary + * lessons. + */ +public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> { + + private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute(); + private String filenamePrefix; + + public WriteOneFilePerWindow(String filenamePrefix) { + this.filenamePrefix = filenamePrefix; + } + + @Override + public PDone expand(PCollection<String> input) { + return input.apply( + TextIO.Write.to(new PerWindowFiles(filenamePrefix)).withWindowedWrites().withNumShards(3)); + } + + /** + * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data + * being written. This always includes the shard number and the total number of shards. For + * windowed writes, it also includes the window and pane index (a sequence number assigned to each + * trigger firing). + */ + public static class PerWindowFiles extends FilenamePolicy { + + private final String output; + + public PerWindowFiles(String output) { + this.output = output; + } + + @Override + public ValueProvider<String> getBaseOutputFilenameProvider() { + return StaticValueProvider.of(output); + } + + public String filenamePrefixForWindow(IntervalWindow window) { + return String.format( + "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end())); + } + + @Override + public String windowedFilename(WindowedContext context) { + IntervalWindow window = (IntervalWindow) context.getWindow(); + return String.format( + "%s-%s-of-%s", + filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards()); + } + + @Override + public String unwindowedFilename(Context context) { + throw new UnsupportedOperationException("Unsupported."); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java deleted file mode 100644 index cd6baad..0000000 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.common; - -import com.google.common.annotations.VisibleForTesting; -import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.charset.StandardCharsets; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.KV; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -/** - * A {@link DoFn} that writes elements to files with names deterministically derived from the lower - * and upper bounds of their key (an {@link IntervalWindow}). - * - * <p>This is test utility code, not for end-users, so examples can be focused - * on their primary lessons. - */ -public class WriteWindowedFilesDoFn - extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> { - - static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute(); - - private final String output; - - public WriteWindowedFilesDoFn(String output) { - this.output = output; - } - - @VisibleForTesting - public static String fileForWindow(String output, IntervalWindow window) { - return String.format( - "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end())); - } - - @ProcessElement - public void processElement(ProcessContext context) throws Exception { - // Build a file name from the window - IntervalWindow window = context.element().getKey(); - String outputShard = fileForWindow(output, window); - - // Open the file and write all the values - IOChannelFactory factory = IOChannelUtils.getFactory(outputShard); - OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain")); - for (KV<String, Long> wordCount : context.element().getValue()) { - STRING_CODER.encode( - wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER); - out.write(NEWLINE); - } - out.close(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 703f836..857f1d3 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -23,13 +23,14 @@ import com.google.api.client.util.Sleeper; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.examples.common.WriteWindowedFilesDoFn; +import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.ExplicitShardedFile; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.NumberedShardedFile; import org.apache.beam.sdk.util.ShardedFile; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; @@ -64,7 +66,7 @@ public class WindowedWordCountIT { @Rule public TestName testName = new TestName(); private static final String DEFAULT_INPUT = - "gs://apache-beam-samples/shakespeare/winterstale-personae"; + "gs://apache-beam-samples/shakespeare/sonnets.txt"; static final int MAX_READ_RETRIES = 4; static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); static final FluentBackoff BACK_OFF_FACTORY = @@ -130,14 +132,18 @@ public class WindowedWordCountIT { String outputPrefix = options.getOutput(); - List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6); + PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix); + + List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6); + for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) { - Instant windowStart = + final Instant windowStart = new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute)); expectedOutputFiles.add( - WriteWindowedFilesDoFn.fileForWindow( - outputPrefix, - new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))))); + new NumberedShardedFile( + filenamePolicy.filenamePrefixForWindow( + new IntervalWindow( + windowStart, windowStart.plus(Duration.standardMinutes(10)))) + "*")); } ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile())); @@ -157,7 +163,7 @@ public class WindowedWordCountIT { } options.setOnSuccessMatcher( - new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles))); + new WordCountsMatcher(expectedWordCounts, expectedOutputFiles)); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } @@ -172,24 +178,28 @@ public class WindowedWordCountIT { private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); private final SortedMap<String, Long> expectedWordCounts; - private final ShardedFile outputFile; + private final List<ShardedFile> outputFiles; private SortedMap<String, Long> actualCounts; - public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) { + public WordCountsMatcher( + SortedMap<String, Long> expectedWordCounts, List<ShardedFile> outputFiles) { this.expectedWordCounts = expectedWordCounts; - this.outputFile = outputFile; + this.outputFiles = outputFiles; } @Override public boolean matchesSafely(PipelineResult pipelineResult) { try { // Load output data - List<String> lines = - outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + List<String> outputLines = new ArrayList<>(); + for (ShardedFile outputFile : outputFiles) { + outputLines.addAll( + outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())); + } // Since the windowing is nondeterministic we only check the sums actualCounts = new TreeMap<>(); - for (String line : lines) { + for (String line : outputLines) { String[] splits = line.split(": "); String word = splits[0]; long count = Long.parseLong(splits[1]); @@ -205,7 +215,8 @@ public class WindowedWordCountIT { return actualCounts.equals(expectedWordCounts); } catch (Exception e) { throw new RuntimeException( - String.format("Failed to read from sharded output: %s", outputFile)); + String.format("Failed to read from sharded output: %s due to exception", + outputFiles), e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index f4ae577..c4f1bd6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -254,7 +254,8 @@ public class PTransformMatchers { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { if (application.getTransform() instanceof Write) { - return ((Write) application.getTransform()).getSharding() == null; + Write write = (Write) application.getTransform(); + return write.getSharding() == null && write.getNumShards() == null; } return false; } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 63122fe..1bf5839 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; @@ -54,8 +56,7 @@ class WriteWithShardingFactory<InputT> @Override public PTransform<PCollection<InputT>, PDone> getReplacementTransform( Write<InputT> transform) { - - return transform.withSharding(new LogElementShardsWithDrift<InputT>()); + return transform.withSharding(new LogElementShardsWithDrift<InputT>()); } @Override @@ -74,6 +75,7 @@ class WriteWithShardingFactory<InputT> @Override public PCollectionView<Integer> expand(PCollection<T> records) { return records + .apply(Window.<T>into(new GlobalWindows())) .apply("CountRecords", Count.<T>globally()) .apply("GenerateShardCount", ParDo.of(new CalculateShardsFn())) .apply(View.<Integer>asSingleton()); http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java index 301d841..af36b80 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java @@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.TypeDescriptor; @@ -63,6 +65,10 @@ public class UnboundedFlinkSink<T> extends Sink<T> { } @Override + public void setWindowedWrites(boolean windowedWrites) { + } + + @Override public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception { @@ -141,7 +147,19 @@ public class UnboundedFlinkSink<T> extends Sink<T> { public Writer<T, Object> createWriter(PipelineOptions options) throws Exception { return new Writer<T, Object>() { @Override - public void open(String uId) throws Exception { + public void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + } + + @Override + public void openUnwindowed(String uId, int shard, int numShards) throws Exception { + } + + @Override + public void cleanup() throws Exception { } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 572c291..38b790e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -33,6 +33,8 @@ import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.test.util.JavaProgramTestBase; @@ -119,6 +121,11 @@ public class WriteSinkITCase extends JavaProgramTestBase { } @Override + public void setWindowedWrites(boolean windowedWrites) { + + } + + @Override public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { @@ -142,13 +149,27 @@ public class WriteSinkITCase extends JavaProgramTestBase { private PrintWriter internalWriter; @Override - public void open(String uId) throws Exception { + public final void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + throw new UnsupportedOperationException("Windowed writes not supported."); + } + + @Override + public final void openUnwindowed(String uId, int shard, int numShards) throws Exception { Path path = new Path(resultPath + "/" + uId); FileSystem.get(new URI("file:///")).create(path, false); internalWriter = new PrintWriter(new File(path.toUri())); } @Override + public void cleanup() throws Exception { + + } + + @Override public void write(String value) throws Exception { internalWriter.println(value); } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 96f0a50..a41c9f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; + import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; @@ -39,6 +40,7 @@ import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; @@ -85,11 +87,21 @@ import org.apache.beam.sdk.values.PDone; * } </pre> * * <p>To write a {@link PCollection} to one or more Avro files, use - * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to} to specify + * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify * the path of the file to write to (e.g., a local filename or sharded * filename pattern if running locally, or a Google Cloud Storage * filename or sharded filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). + * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FilenamePolicy)} can also be used + * to specify a custom file naming policy. + * + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - + * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be + * preserved. When producing windowed writes, the number of output shards must be set explicitly + * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a + * runner-chosen value, so you may need not set it yourself. A + * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce + * unique filenames. * * <p>It is required to specify {@link AvroIO.Write#withSchema}. To * write specific records, such as Avro-generated classes, provide an @@ -369,6 +381,14 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes to the file(s) specified by the provided + * {@link FileBasedSink.FilenamePolicy}. + */ + public static Bound<GenericRecord> to(FilenamePolicy filenamePolicy) { + return new Bound<>(GenericRecord.class).to(filenamePolicy); + } + + /** * Returns a {@link PTransform} that writes to the file(s) with the * given filename suffix. */ @@ -496,6 +516,9 @@ public class AvroIO { final Schema schema; /** An option to indicate if output validation is desired. Default is true. */ final boolean validate; + final boolean windowedWrites; + FilenamePolicy filenamePolicy; + /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html @@ -515,7 +538,9 @@ public class AvroIO { null, true, DEFAULT_CODEC, - ImmutableMap.<String, Object>of()); + ImmutableMap.<String, Object>of(), + false, + null); } Bound( @@ -528,7 +553,9 @@ public class AvroIO { Schema schema, boolean validate, SerializableAvroCodecFactory codec, - Map<String, Object> metadata) { + Map<String, Object> metadata, + boolean windowedWrites, + FilenamePolicy filenamePolicy) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -538,6 +565,8 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + this.windowedWrites = windowedWrites; + this.filenamePolicy = filenamePolicy; Map<String, String> badKeys = Maps.newLinkedHashMap(); for (Map.Entry<String, Object> entry : metadata.entrySet()) { @@ -573,7 +602,25 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); + } + + public Bound<T> to(FilenamePolicy filenamePolicy) { + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** @@ -596,7 +643,9 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -625,7 +674,9 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -647,7 +698,9 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -670,7 +723,25 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); + } + + public Bound<T> withWindowedWrites() { + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec, + metadata, + true, + filenamePolicy); } /** @@ -693,7 +764,9 @@ public class AvroIO { ReflectData.get().getSchema(type), validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -714,7 +787,9 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -749,7 +824,9 @@ public class AvroIO { schema, false, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -769,7 +846,9 @@ public class AvroIO { schema, validate, new SerializableAvroCodecFactory(codec), - metadata); + metadata, + windowedWrites, + filenamePolicy); } /** @@ -789,31 +868,49 @@ public class AvroIO { schema, validate, codec, - metadata); + metadata, + windowedWrites, + filenamePolicy); } @Override public PDone expand(PCollection<T> input) { - if (filenamePrefix == null) { + if (filenamePolicy == null && filenamePrefix == null) { throw new IllegalStateException( "need to set the filename prefix of an AvroIO.Write transform"); } + if (filenamePolicy != null && filenamePrefix != null) { + throw new IllegalStateException( + "cannot set both a filename policy and a filename prefix"); + } if (schema == null) { throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } - org.apache.beam.sdk.io.Write<T> write = - org.apache.beam.sdk.io.Write.to( - new AvroSink<>( - filenamePrefix, - filenameSuffix, - shardTemplate, - AvroCoder.of(type, schema), - codec, - metadata)); + org.apache.beam.sdk.io.Write<T> write = null; + if (filenamePolicy != null) { + write = org.apache.beam.sdk.io.Write.to( + new AvroSink<>( + filenamePolicy, + AvroCoder.of(type, schema), + codec, + metadata)); + } else { + write = org.apache.beam.sdk.io.Write.to( + new AvroSink<>( + filenamePrefix, + filenameSuffix, + shardTemplate, + AvroCoder.of(type, schema), + codec, + metadata)); + } if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } + if (windowedWrites) { + write = write.withWindowedWrites(); + } return input.apply("Write", write); } @@ -940,6 +1037,18 @@ public class AvroIO { @VisibleForTesting AvroSink( + FilenamePolicy filenamePolicy, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { + super(filenamePolicy); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @VisibleForTesting + AvroSink( String baseOutputFilename, String extension, String fileNameTemplate, http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index ae28b62..9b5f130 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -17,34 +17,48 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; @@ -146,21 +160,165 @@ public abstract class FileBasedSink<T> extends Sink<T> { */ protected final WritableByteChannelFactory writableByteChannelFactory; + /** - * Base filename for final output files. + * A naming policy for output files. */ - protected final ValueProvider<String> baseOutputFilename; + public abstract static class FilenamePolicy implements Serializable { + /** + * Context used for generating a name based on shard numer, and num shards. + * The policy must produce unique filenames for unique {@link Context} objects. + * + * <p>Be careful about adding fields to this as existing strategies will not notice the new + * fields, and may not produce unique filenames. + */ + public static class Context { + private int shardNumber; + private int numShards; + + + public Context(int shardNumber, int numShards) { + this.shardNumber = shardNumber; + this.numShards = numShards; + } + + public int getShardNumber() { + return shardNumber; + } + + + public int getNumShards() { + return numShards; + } + } + + /** + * Context used for generating a name based on window, pane, shard numer, and num shards. + * The policy must produce unique filenames for unique {@link WindowedContext} objects. + * + * <p>Be careful about adding fields to this as existing strategies will not notice the new + * fields, and may not produce unique filenames. + */ + public static class WindowedContext { + private int shardNumber; + private int numShards; + private BoundedWindow window; + private PaneInfo paneInfo; + + public WindowedContext( + BoundedWindow window, + PaneInfo paneInfo, + int shardNumber, + int numShards) { + this.window = window; + this.paneInfo = paneInfo; + this.shardNumber = shardNumber; + this.numShards = numShards; + } + + public BoundedWindow getWindow() { + return window; + } + + public PaneInfo getPaneInfo() { + return paneInfo; + } + + public int getShardNumber() { + return shardNumber; + } + + public int getNumShards() { + return numShards; + } + } + + /** + * When a sink has requested windowed or triggered output, this method will be invoked to return + * the filename. The {@link WindowedContext} object gives access to the window and pane, as + * well as sharding information. The policy must return unique and consistent filenames + * for different windows and panes. + */ + public abstract String windowedFilename(WindowedContext c); + + /** + * When a sink has not requested windowed output, this method will be invoked to return the + * filename. The {@link Context} object only provides sharding information, which is used by + * the policy to generate unique and consistent filenames. + */ + public abstract String unwindowedFilename(Context c); + + /** + * @return The base filename for all output files. + */ + public abstract ValueProvider<String> getBaseOutputFilenameProvider(); + + /** + * Populates the display data. + */ + public void populateDisplayData(DisplayData.Builder builder) { + } + } /** - * The extension to be used for the final output files. + * A default filename policy. */ - protected final String extension; + protected class DefaultFilenamePolicy extends FilenamePolicy { + ValueProvider<String> baseOutputFilename; + String extension; + String fileNamingTemplate; + + public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, String extension, + String fileNamingTemplate) { + this.baseOutputFilename = baseOutputFilename; + if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) { + this.extension = extension + getFileExtension( + writableByteChannelFactory.getFilenameSuffix()); + } else { + this.extension = extension; + } + this.fileNamingTemplate = fileNamingTemplate; + } + + @Override + public String unwindowedFilename(FilenamePolicy.Context context) { + if (context.numShards <= 0) { + return null; + } + + String suffix = getFileExtension(extension); + String filename = IOChannelUtils.constructName( + baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(), + context.getNumShards()); + return filename; + } + + @Override + public String windowedFilename(FilenamePolicy.WindowedContext c) { + throw new UnsupportedOperationException("There is no default policy for windowed file" + + " output. Please provide an explicit FilenamePolicy to generate filenames."); + } + + @Override + public ValueProvider<String> getBaseOutputFilenameProvider() { + return baseOutputFilename; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + String fileNamePattern = String.format("%s%s%s", + baseOutputFilename.isAccessible() + ? baseOutputFilename.get() : baseOutputFilename.toString(), + fileNamingTemplate, getFileExtension(extension)); + builder.add(DisplayData.item("fileNamePattern", fileNamePattern) + .withLabel("File Name Pattern")); + } + } /** - * Naming template for output files. See {@link ShardNameTemplate} for a description of - * possible naming templates. Default is {@link ShardNameTemplate#INDEX_OF_MAX}. + * The policy used to generate output filenames. */ - protected final String fileNamingTemplate; + protected FilenamePolicy fileNamePolicy; /** * Construct a FileBasedSink with the given base output filename and extension. A @@ -201,20 +359,30 @@ public abstract class FileBasedSink<T> extends Sink<T> { public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension, String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) { this.writableByteChannelFactory = writableByteChannelFactory; - this.baseOutputFilename = baseOutputFilename; - if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) { - this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix()); - } else { - this.extension = extension; - } - this.fileNamingTemplate = fileNamingTemplate; + this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, extension, + fileNamingTemplate); + } + + public FileBasedSink(FilenamePolicy fileNamePolicy) { + this(fileNamePolicy, CompressionType.UNCOMPRESSED); + + } + + public FileBasedSink(FilenamePolicy fileNamePolicy, + WritableByteChannelFactory writableByteChannelFactory) { + this.fileNamePolicy = fileNamePolicy; + this.writableByteChannelFactory = writableByteChannelFactory; } /** * Returns the base output filename for this file based sink. */ public ValueProvider<String> getBaseOutputFilenameProvider() { - return baseOutputFilename; + return fileNamePolicy.getBaseOutputFilenameProvider(); + } + + public FilenamePolicy getFileNamePolicy() { + return fileNamePolicy; } @Override @@ -230,13 +398,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - String fileNamePattern = String.format("%s%s%s", - baseOutputFilename.isAccessible() - ? baseOutputFilename.get() : baseOutputFilename.toString(), - fileNamingTemplate, getFileExtension(extension)); - builder.add(DisplayData.item("fileNamePattern", fileNamePattern) - .withLabel("File Name Pattern")); + getFileNamePolicy().populateDisplayData(builder); } /** @@ -286,7 +448,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { * constructor arguments. * * <p>Subclass implementations can change the file naming template by supplying a value for - * {@link FileBasedSink#fileNamingTemplate}. + * fileNamingTemplate. * * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary * files will occur. @@ -304,6 +466,9 @@ public abstract class FileBasedSink<T> extends Sink<T> { /** Directory for temporary output files. */ protected final ValueProvider<String> tempDirectory; + /** Whether windowed writes are being used. */ + protected boolean windowedWrites; + /** Constructs a temporary file path given the temporary directory and a filename. */ protected static String buildTemporaryFilename(String tempDirectory, String filename) throws IOException { @@ -361,6 +526,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) { this.sink = sink; this.tempDirectory = tempDirectory; + this.windowedWrites = false; } /** @@ -371,6 +537,11 @@ public abstract class FileBasedSink<T> extends Sink<T> { @Override public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception; + @Override + public void setWindowedWrites(boolean windowedWrites) { + this.windowedWrites = windowedWrites; + } + /** * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass * implementations to perform initialization of the sink at pipeline runtime. This method must @@ -395,22 +566,55 @@ public abstract class FileBasedSink<T> extends Sink<T> { * @param writerResults the results of writes (FileResult). */ @Override - public void finalize(Iterable<FileResult> writerResults, PipelineOptions options) + public void finalize(Iterable<FileResult> writerResults, + PipelineOptions options) throws Exception { // Collect names of temporary files and rename them. - List<String> files = new ArrayList<>(); - for (FileResult result : writerResults) { - LOG.debug("Temporary bundle output file {} will be copied.", result.getFilename()); - files.add(result.getFilename()); - } - copyToOutputFiles(files, options); + Map<String, String> outputFilenames = buildOutputFilenames(writerResults); + copyToOutputFiles(outputFilenames, options); + // Optionally remove temporary files. // We remove the entire temporary directory, rather than specifically removing the files // from writerResults, because writerResults includes only successfully completed bundles, // and we'd like to clean up the failed ones too. // Note that due to GCS eventual consistency, matching files in the temp directory is also // currently non-perfect and may fail to delete some files. - removeTemporaryFiles(files, options); + // + // When windows or triggers are specified, files are generated incrementally so deleting + // the entire directory in finalize is incorrect. + removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options); + } + + protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> writerResults) { + Map<String, String> outputFilenames = new HashMap<>(); + List<String> files = new ArrayList<>(); + for (FileResult result : writerResults) { + if (result.getDestinationFilename() != null) { + outputFilenames.put(result.getFilename(), result.getDestinationFilename()); + } else { + files.add(result.getFilename()); + } + } + + // If the user does not specify numShards() (not supported with windowing). Then the + // writerResults won't contain destination filenames, so we dynamically generate them here. + if (files.size() > 0) { + checkArgument(outputFilenames.isEmpty()); + // Sort files for idempotence. + files = Ordering.natural().sortedCopy(files); + FilenamePolicy filenamePolicy = getSink().fileNamePolicy; + for (int i = 0; i < files.size(); i++) { + outputFilenames.put(files.get(i), + filenamePolicy.unwindowedFilename(new Context(i, files.size()))); + } + } + + int numDistinctShards = new HashSet<String>(outputFilenames.values()).size(); + checkState(numDistinctShards == outputFilenames.size(), + "Only generated %s distinct file names for %s files.", + numDistinctShards, outputFilenames.size()); + + return outputFilenames; } /** @@ -425,47 +629,19 @@ public abstract class FileBasedSink<T> extends Sink<T> { * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc. * * @param filenames the filenames of temporary files. - * @return a list containing the names of final output files. */ - protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options) + protected final void copyToOutputFiles(Map<String, String> filenames, + PipelineOptions options) throws IOException { int numFiles = filenames.size(); - // Sort files for idempotence. - List<String> srcFilenames = Ordering.natural().sortedCopy(filenames); - List<String> destFilenames = generateDestinationFilenames(numFiles); - if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); - IOChannelUtils.getFactory(destFilenames.get(0)) - .copy(srcFilenames, destFilenames); + IOChannelFactory channelFactory = + IOChannelUtils.getFactory(filenames.values().iterator().next()); + channelFactory.copy(filenames.keySet(), filenames.values()); } else { LOG.info("No output files to write."); } - - return destFilenames; - } - - /** - * Generate output bundle filenames. - */ - protected final List<String> generateDestinationFilenames(int numFiles) { - List<String> destFilenames = new ArrayList<>(); - String extension = getSink().extension; - String baseOutputFilename = getSink().baseOutputFilename.get(); - String fileNamingTemplate = getSink().fileNamingTemplate; - - String suffix = getFileExtension(extension); - for (int i = 0; i < numFiles; i++) { - destFilenames.add(IOChannelUtils.constructName( - baseOutputFilename, fileNamingTemplate, suffix, i, numFiles)); - } - - int numDistinctShards = new HashSet<String>(destFilenames).size(); - checkState(numDistinctShards == numFiles, - "Shard name template '%s' only generated %s distinct file names for %s files.", - fileNamingTemplate, numDistinctShards, numFiles); - - return destFilenames; } /** @@ -475,7 +651,9 @@ public abstract class FileBasedSink<T> extends Sink<T> { * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize * temporary files, this method will remove them. */ - protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options) + protected final void removeTemporaryFiles(Set<String> knownFiles, + boolean shouldRemoveTemporaryDirectory, + PipelineOptions options) throws IOException { String tempDir = tempDirectory.get(); LOG.debug("Removing temporary bundle output files in {}.", tempDir); @@ -485,15 +663,18 @@ public abstract class FileBasedSink<T> extends Sink<T> { // directory matching APIs, we remove not only files that the filesystem says exist // in the directory (which may be incomplete), but also files that are known to exist // (produced by successfully completed bundles). + // This may still fail to remove temporary outputs of some failed bundles, but at least // the common case (where all bundles succeed) is guaranteed to be fully addressed. Set<String> matches = new HashSet<>(); // TODO: Windows OS cannot resolves and matches '*' in the path, // ignore the exception for now to avoid failing the pipeline. - try { - matches.addAll(factory.match(factory.resolve(tempDir, "*"))); - } catch (Exception e) { - LOG.warn("Failed to match temporary files under: [{}].", tempDir); + if (shouldRemoveTemporaryDirectory) { + try { + matches.addAll(factory.match(factory.resolve(tempDir, "*"))); + } catch (Exception e) { + LOG.warn("Failed to match temporary files under: [{}].", tempDir); + } } Set<String> allMatches = new HashSet<>(matches); allMatches.addAll(knownFiles); @@ -517,7 +698,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { */ @Override public Coder<FileResult> getWriterResultCoder() { - return SerializableCoder.of(FileResult.class); + return FileResultCoder.of(); } /** @@ -553,8 +734,13 @@ public abstract class FileBasedSink<T> extends Sink<T> { */ private String id; + private BoundedWindow window; + private PaneInfo paneInfo; + private int shard = -1; + private int numShards = -1; + /** - * The filename of the output bundle - $tempDirectory/$id. + * The filename of the output bundle. */ private String filename; @@ -610,8 +796,37 @@ public abstract class FileBasedSink<T> extends Sink<T> { * Opens the channel. */ @Override - public final void open(String uId) throws Exception { + public final void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + if (!getWriteOperation().windowedWrites) { + throw new IllegalStateException("openWindowed called a non-windowed sink."); + } + open(uId, window, paneInfo, shard, numShards); + } + + @Override + public final void openUnwindowed(String uId, + int shard, + int numShards) throws Exception { + if (getWriteOperation().windowedWrites) { + throw new IllegalStateException("openUnwindowed called a windowed sink."); + } + open(uId, null, null, shard, numShards); + } + + private void open(String uId, + @Nullable BoundedWindow window, + @Nullable PaneInfo paneInfo, + int shard, + int numShards) throws Exception { this.id = uId; + this.window = window; + this.paneInfo = paneInfo; + this.shard = shard; + this.numShards = numShards; filename = FileBasedWriteOperation.buildTemporaryFilename( getWriteOperation().tempDirectory.get(), uId); LOG.debug("Opening {}.", filename); @@ -639,6 +854,13 @@ public abstract class FileBasedSink<T> extends Sink<T> { LOG.debug("Starting write of bundle {} to {}.", this.id, filename); } + @Override + public void cleanup() throws Exception { + if (filename != null) { + IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename)); + } + } + /** * Closes the channel and returns the bundle result. */ @@ -653,8 +875,17 @@ public abstract class FileBasedSink<T> extends Sink<T> { throw new IllegalStateException("Channel should only be closed by its owner: " + channel); } } - FileResult result = new FileResult(filename); - LOG.debug("Result for bundle {}: {}", this.id, filename); + + FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy; + String destinationFile; + if (window != null) { + destinationFile = filenamePolicy.windowedFilename(new WindowedContext( + window, paneInfo, shard, numShards)); + } else { + destinationFile = filenamePolicy.unwindowedFilename(new Context(shard, numShards)); + } + FileResult result = new FileResult(filename, destinationFile); + LOG.debug("Result for bundle {}: {} {}", this.id, filename, destinationFile); return result; } @@ -670,18 +901,62 @@ public abstract class FileBasedSink<T> extends Sink<T> { /** * Result of a single bundle write. Contains the filename of the bundle. */ - public static final class FileResult implements Serializable { + public static final class FileResult { private final String filename; + private final String destinationFilename; - public FileResult(String filename) { + public FileResult(String filename, String destinationFilename) { this.filename = filename; + this.destinationFilename = destinationFilename; } public String getFilename() { return filename; } + + public String getDestinationFilename() { + return destinationFilename; + } + + } + + /** + * A coder for FileResult objects. + */ + public static final class FileResultCoder extends AtomicCoder<FileResult> { + private static final FileResultCoder INSTANCE = new FileResultCoder(); + private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of()); + + @JsonCreator + public static FileResultCoder of() { + return INSTANCE; + } + + @Override + public void encode(FileResult value, OutputStream outStream, Context context) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null value"); + } + stringCoder.encode(value.getFilename(), outStream, context.nested()); + stringCoder.encode(value.getDestinationFilename(), outStream, context.nested()); + } + + @Override + public FileResult decode(InputStream inStream, Context context) + throws IOException { + return new FileResult( + stringCoder.decode(inStream, context.nested()), + stringCoder.decode(inStream, context.nested())); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, "TableRows are not deterministic."); + } } + /** * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java index 6742784..d53c6ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; /** @@ -63,11 +65,12 @@ import org.apache.beam.sdk.values.PCollection; * operation corresponds to. See below for more information about these methods and restrictions on * their implementation. * - * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods: - * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write}, - * which writes a single record from the bundle; {@link Writer#close}, which is called once at the - * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation - * that the writer belongs to. + * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines several methods: + * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are called once at the + * start of writing a bundle, depending on whether windowed or unwindowed output is requested. + * {@link Writer#write}, which writes a single record from the bundle; {@link Writer#close}, + * which is called once at the end of writing a bundle; and {@link Writer#getWriteOperation}, + * which returns the write operation that the writer belongs to. * </ul> * * <h2>WriteOperation</h2> @@ -95,9 +98,10 @@ import org.apache.beam.sdk.values.PCollection; * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed - * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried - * bundles will have a unique way of identifying their output. + * result passed to the WriteOperation's finalize method. Each call to {@link Writer#openWindowed} + * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the + * Write transform, so even redundant or retried bundles will have a unique way of identifying + * their output. * * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness * guarantee is important; if a bundle is to be output to a file, for example, the name of the file @@ -174,6 +178,11 @@ public abstract class Sink<T> implements Serializable, HasDisplayData { public abstract void initialize(PipelineOptions options) throws Exception; /** + * Indicates that the operation will be performing windowed writes. + */ + public abstract void setWindowedWrites(boolean windowedWrites); + + /** * Given an Iterable of results from bundle writes, performs finalization after writing and * closes the sink. Called after all bundle writes are complete. * @@ -200,7 +209,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData { * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink. * * <p>The bundle id that the writer will use to uniquely identify its output will be passed to - * {@link Writer#open}. + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}. * * <p>Must not mutate the state of the WriteOperation. */ @@ -218,9 +227,10 @@ public abstract class Sink<T> implements Serializable, HasDisplayData { } /** - * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is - * called before writing begins and {@link Writer#close} is called after all elements in the - * bundle have been written. {@link Writer#write} writes an element to the sink. + * A Writer writes a bundle of elements from a PCollection to a sink. + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins + * and {@link Writer#close} is called after all elements in the bundle have been written. + * {@link Writer#write} writes an element to the sink. * * <p>Note that any access to static members or methods of a Writer must be thread-safe, as * multiple instances of a Writer may be instantiated in different threads on the same worker. @@ -238,8 +248,25 @@ public abstract class Sink<T> implements Serializable, HasDisplayData { * <p>The unique id that is given to open should be used to ensure that the writer's output does * not interfere with the output of other Writers, as a bundle may be executed many times for * fault tolerance. See {@link Sink} for more information about bundle ids. + * + * <p></p>The window and paneInfo arguments are populated when windowed writes are requested. + * shard and numbShards are populated for the case of static sharding. In cases where the + * runner is dynamically picking sharding, shard and numShards might both be set to -1. + */ + public abstract void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception; + + /** + * Perform bundle initialization for the case where the file is written unwindowed. */ - public abstract void open(String uId) throws Exception; + public abstract void openUnwindowed(String uId, + int shard, + int numShards) throws Exception; + + public abstract void cleanup() throws Exception; /** * Called for each value in the bundle. @@ -262,5 +289,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData { * Returns the write operation this writer belongs to. */ public abstract WriteOperation<T, WriteT> getWriteOperation(); + + } } http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 58b55a9..ea80639 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; @@ -85,6 +86,14 @@ import org.apache.beam.sdk.values.PDone; * filename or sharded filename pattern of the form * {@code "gs://<bucket>/<filepath>"}). * + * <p>By default, all input is put into the global window before writing. If per-window writes are + * desired - for example, when using a streaming runner - + * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be + * preserved. When producing windowed writes, the number of output shards must be set explicitly + * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a + * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be + * set, and unique windows and triggers must produce unique filenames. + * * <p>Any existing files with the same names as generated output files * will be overwritten. * @@ -352,6 +361,10 @@ public class TextIO { return new Bound().to(prefix); } + public static Bound to(FilenamePolicy filenamePolicy) { + return new Bound().to(filenamePolicy); + + } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ @@ -479,6 +492,12 @@ public class TextIO { /** An option to indicate if output validation is desired. Default is true. */ private final boolean validate; + /** A policy for naming output files. */ + private final FilenamePolicy filenamePolicy; + + /** Whether to write windowed output files. */ + private boolean windowedWrites; + /** * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is * {@link FileBasedSink.CompressionType#UNCOMPRESSED}. @@ -487,13 +506,15 @@ public class TextIO { private Bound() { this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true, - FileBasedSink.CompressionType.UNCOMPRESSED); + FileBasedSink.CompressionType.UNCOMPRESSED, null, false); } private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix, @Nullable String header, @Nullable String footer, int numShards, String shardTemplate, boolean validate, - WritableByteChannelFactory writableByteChannelFactory) { + WritableByteChannelFactory writableByteChannelFactory, + FilenamePolicy filenamePolicy, + boolean windowedWrites) { super(name); this.header = header; this.footer = footer; @@ -504,6 +525,8 @@ public class TextIO { this.validate = validate; this.writableByteChannelFactory = firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED); + this.filenamePolicy = filenamePolicy; + this.windowedWrites = windowedWrites; } /** @@ -518,7 +541,7 @@ public class TextIO { validateOutputComponent(filenamePrefix); return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, header, footer, numShards, shardTemplate, validate, - writableByteChannelFactory); + writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -526,7 +549,15 @@ public class TextIO { */ public Bound to(ValueProvider<String> filenamePrefix) { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); + } + + /** + * Like {@link #to(String)}, but with a {@link FilenamePolicy}. + */ + public Bound to(FilenamePolicy filenamePolicy) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -540,7 +571,7 @@ public class TextIO { public Bound withSuffix(String nameExtension) { validateOutputComponent(nameExtension); return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -560,7 +591,7 @@ public class TextIO { public Bound withNumShards(int numShards) { checkArgument(numShards >= 0); return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -573,7 +604,7 @@ public class TextIO { */ public Bound withShardNameTemplate(String shardTemplate) { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -591,7 +622,7 @@ public class TextIO { */ public Bound withoutSharding() { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "", - validate, writableByteChannelFactory); + validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -606,7 +637,7 @@ public class TextIO { */ public Bound withoutValidation() { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, false, writableByteChannelFactory); + shardTemplate, false, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -621,7 +652,7 @@ public class TextIO { */ public Bound withHeader(@Nullable String header) { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -636,7 +667,7 @@ public class TextIO { */ public Bound withFooter(@Nullable String footer) { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); } /** @@ -653,22 +684,39 @@ public class TextIO { public Bound withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, - shardTemplate, validate, writableByteChannelFactory); + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites); + } + + public Bound withWindowedWrites() { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, + shardTemplate, validate, writableByteChannelFactory, filenamePolicy, true); } @Override public PDone expand(PCollection<String> input) { - if (filenamePrefix == null) { + if (filenamePolicy == null && filenamePrefix == null) { + throw new IllegalStateException( + "need to set the filename prefix of an TextIO.Write transform"); + } + if (filenamePolicy != null && filenamePrefix != null) { throw new IllegalStateException( - "need to set the filename prefix of a TextIO.Write transform"); + "cannot set both a filename policy and a filename prefix"); + } + org.apache.beam.sdk.io.Write<String> write = null; + if (filenamePolicy != null) { + write = org.apache.beam.sdk.io.Write.to( + new TextSink(filenamePolicy, header, footer, writableByteChannelFactory)); + } else { + write = org.apache.beam.sdk.io.Write.to( + new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate, + writableByteChannelFactory)); } - org.apache.beam.sdk.io.Write<String> write = - org.apache.beam.sdk.io.Write.to( - new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate, - writableByteChannelFactory)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } + if (windowedWrites) { + write = write.withWindowedWrites(); + } return input.apply("Write", write); } @@ -676,8 +724,11 @@ public class TextIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String prefixString = filenamePrefix.isAccessible() - ? filenamePrefix.get() : filenamePrefix.toString(); + String prefixString = ""; + if (filenamePrefix != null) { + prefixString = filenamePrefix.isAccessible() + ? filenamePrefix.get() : filenamePrefix.toString(); + } builder .addIfNotNull(DisplayData.item("filePrefix", prefixString) .withLabel("Output File Prefix")) @@ -1023,6 +1074,13 @@ public class TextIO { @Nullable private final String footer; @VisibleForTesting + TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer, + WritableByteChannelFactory writableByteChannelFactory) { + super(filenamePolicy, writableByteChannelFactory); + this.header = header; + this.footer = footer; + } + @VisibleForTesting TextSink( ValueProvider<String> baseOutputFilename, String extension,
