Repository: beam Updated Branches: refs/heads/release-2.2.0 8da5c1b61 -> 55a1124ac
Cherrypick WriteFiles fix. This closes #4124: [BEAM-3169] Fixes a data loss bug in WriteFiles when used with fire-once triggers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55a1124a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55a1124a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55a1124a Branch: refs/heads/release-2.2.0 Commit: 55a1124ac8d39de79d2fd985cec08fcacd7775eb Parents: 8da5c1b Author: Eugene Kirpichov <[email protected]> Authored: Tue Nov 14 19:34:42 2017 -0800 Committer: Reuven Lax <[email protected]> Committed: Wed Nov 15 19:51:52 2017 +0800 ---------------------------------------------------------------------- .../apex/translation/ParDoTranslator.java | 2 +- .../runners/apex/examples/WordCountTest.java | 2 +- .../construction/WriteFilesTranslationTest.java | 1 - .../beam/runners/spark/io/AvroPipelineTest.java | 12 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 168 ++++----- .../java/org/apache/beam/sdk/io/WriteFiles.java | 375 ++++++++++++------- .../apache/beam/sdk/io/FileBasedSinkTest.java | 77 ++-- .../org/apache/beam/sdk/io/TextIOWriteTest.java | 84 +++-- .../org/apache/beam/sdk/io/WriteFilesTest.java | 47 ++- 9 files changed, 453 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index dd4bd67..6a052d1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -213,7 +213,7 @@ class ParDoTranslator<InputT, OutputT> sideInputCollection.getWindowingStrategy()); } if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) { - String msg = "Multiple side inputs with different coders."; + String msg = context.getFullName() + ": Multiple side inputs with different coders."; throw new UnsupportedOperationException(msg); } sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput)); http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index ba75746..e050c15 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -108,7 +108,7 @@ public class WordCountTest { .apply(ParDo.of(new ExtractWordsFn())) .apply(Count.<String>perElement()) .apply(ParDo.of(new FormatAsStringFn())) - .apply("WriteCounts", TextIO.write().to(options.getOutput())) + .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2)) ; p.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index e8eda76..689518a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -64,7 +64,6 @@ public class WriteFilesTranslationTest { public static Iterable<WriteFiles<Object, Void, Object>> data() { return ImmutableList.of( WriteFiles.to(new DummySink()), - WriteFiles.to(new DummySink()).withWindowedWrites(), WriteFiles.to(new DummySink()).withNumShards(17), WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index adde8d2..e17a6b8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -47,7 +47,7 @@ import org.junit.rules.TemporaryFolder; public class AvroPipelineTest { private File inputFile; - private File outputDir; + private File outputFile; @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); @@ -58,8 +58,7 @@ public class AvroPipelineTest { @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.avro"); - outputDir = tmpDir.newFolder("out"); - outputDir.delete(); + outputFile = new File(tmpDir.getRoot(), "out.avro"); } @Test @@ -73,7 +72,10 @@ public class AvroPipelineTest { PCollection<GenericRecord> input = pipeline.apply( AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); - input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath())); + input.apply( + AvroIO.writeGenericRecords(schema) + .to(outputFile.getAbsolutePath()) + .withoutSharding()); pipeline.run(); List<GenericRecord> records = readGenericFile(); @@ -98,7 +100,7 @@ public class AvroPipelineTest { List<GenericRecord> records = Lists.newArrayList(); GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(); try (DataFileReader<GenericRecord> dataFileReader = - new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) { + new DataFileReader<>(outputFile, genericDatumReader)) { for (GenericRecord record : dataFileReader) { records.add(record); } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index ea5129f..f949112 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -44,6 +44,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; @@ -123,6 +125,7 @@ import org.slf4j.LoggerFactory; public abstract class FileBasedSink<UserT, DestinationT, OutputT> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); + static final String TEMP_DIRECTORY_PREFIX = ".temp-beam"; /** @deprecated use {@link Compression}. */ @Deprecated @@ -504,7 +507,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> implements SerializableFunction<ResourceId, ResourceId> { private static final AtomicLong TEMP_COUNT = new AtomicLong(0); private static final DateTimeFormatter TEMPDIR_TIMESTAMP = - DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"); + DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss"); // The intent of the code is to have a consistent value of tempDirectory across // all workers, which wouldn't happen if now() was called inline. private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP); @@ -515,7 +518,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> @Override public ResourceId apply(ResourceId tempDirectory) { // Temp directory has a timestamp and a unique ID - String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId); + String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", timestamp, tempId); return tempDirectory .getCurrentDirectory() .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY); @@ -551,30 +554,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> this.windowedWrites = windowedWrites; } - /** - * Finalizes writing by copying temporary output files to their final location. - * - * <p>Finalization may be overridden by subclass implementations to perform customized - * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code - * writerResults} contains the filenames of written bundles. - * - * <p>If subclasses override this method, they must guarantee that its implementation is - * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It - * is a best practice to attempt to try to make this method atomic. - * - * <p>Returns the map of temporary files generated to final filenames. Callers must call {@link - * #removeTemporaryFiles(Set)} to cleanup the temporary files. - * - * @param writerResults the results of writes (FileResult). - */ - public Map<ResourceId, ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults) - throws Exception { - // Collect names of temporary files and copies them. - Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults); - copyToOutputFiles(outputFilenames); - return outputFilenames; - } - /* * Remove temporary files after finalization. * @@ -596,34 +575,52 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> } @Experimental(Kind.FILESYSTEM) - protected final Map<ResourceId, ResourceId> buildOutputFilenames( + protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames( + @Nullable DestinationT dest, + @Nullable BoundedWindow window, + @Nullable Integer numShards, Iterable<FileResult<DestinationT>> writerResults) { - int numShards = Iterables.size(writerResults); - Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap(); - - // Either all results have a shard number set (if the sink is configured with a fixed - // number of shards), or they all don't (otherwise). - Boolean isShardNumberSetEverywhere = null; - for (FileResult<DestinationT> result : writerResults) { - boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM); - if (isShardNumberSetEverywhere == null) { - isShardNumberSetEverywhere = isShardNumberSetHere; - } else { - checkArgument( - isShardNumberSetEverywhere == isShardNumberSetHere, - "Found a mix of files with and without shard number set: %s", - result); - } + for (FileResult<DestinationT> res : writerResults) { + checkArgument( + Objects.equals(dest, res.getDestination()), + "File result has wrong destination: expected %s, got %s", + dest, res.getDestination()); + checkArgument( + Objects.equals(window, res.getWindow()), + "File result has wrong window: expected %s, got %s", + window, res.getWindow()); } + List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames = Lists.newArrayList(); - if (isShardNumberSetEverywhere == null) { - isShardNumberSetEverywhere = true; + final int effectiveNumShards; + if (numShards != null) { + effectiveNumShards = numShards; + for (FileResult<DestinationT> res : writerResults) { + checkArgument( + res.getShard() != UNKNOWN_SHARDNUM, + "Fixed sharding into %s shards was specified, " + + "but file result %s does not specify a shard", + numShards, + res); + } + } else { + effectiveNumShards = Iterables.size(writerResults); + for (FileResult<DestinationT> res : writerResults) { + checkArgument( + res.getShard() == UNKNOWN_SHARDNUM, + "Runner-chosen sharding was specified, " + + "but file result %s explicitly specifies a shard", + res); + } } List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList(); - if (isShardNumberSetEverywhere) { + if (numShards != null) { resultsWithShardNumbers = Lists.newArrayList(writerResults); } else { + checkState( + !windowedWrites, + "When doing windowed writes, shards should have been assigned when writing"); // Sort files for idempotence. Sort by temporary filename. // Note that this codepath should not be used when processing triggered windows. In the // case of triggers, the list of FileResult objects in the Finalize iterable is not @@ -646,23 +643,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> } } + Map<ResourceId, FileResult<DestinationT>> distinctFilenames = Maps.newHashMap(); for (FileResult<DestinationT> result : resultsWithShardNumbers) { checkArgument( result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result); - outputFilenames.put( - result.getTempFilename(), - result.getDestinationFile( - getSink().getDynamicDestinations(), - numShards, - getSink().getWritableByteChannelFactory())); + ResourceId finalFilename = result.getDestinationFile( + getSink().getDynamicDestinations(), + effectiveNumShards, + getSink().getWritableByteChannelFactory()); + checkArgument( + !distinctFilenames.containsKey(finalFilename), + "Filename policy must generate unique filenames, but generated the same name %s " + + "for file results %s and %s", + finalFilename, + result, + distinctFilenames.get(finalFilename)); + distinctFilenames.put(finalFilename, result); + outputFilenames.add(KV.of(result, finalFilename)); } - - int numDistinctShards = new HashSet<>(outputFilenames.values()).size(); - checkState( - numDistinctShards == outputFilenames.size(), - "Only generated %s distinct file names for %s files.", - numDistinctShards, - outputFilenames.size()); return outputFilenames; } @@ -677,20 +675,23 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B * will be copied to dir/file-001-of-003.txt, etc. - * - * @param filenames the filenames of temporary files. */ @VisibleForTesting @Experimental(Kind.FILESYSTEM) - final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException { - int numFiles = filenames.size(); + final void copyToOutputFiles( + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException { + int numFiles = resultsToFinalFilenames.size(); if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); - List<ResourceId> srcFiles = new ArrayList<>(filenames.size()); - List<ResourceId> dstFiles = new ArrayList<>(filenames.size()); - for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) { - srcFiles.add(srcDestPair.getKey()); - dstFiles.add(srcDestPair.getValue()); + List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size()); + List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size()); + for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { + srcFiles.add(entry.getKey().getTempFilename()); + dstFiles.add(entry.getValue()); + LOG.info( + "Will copy temporary file {} to final location {}", + entry.getKey().getTempFilename(), + entry.getValue()); } // During a failure case, files may have been deleted in an earlier step. Thus // we ignore missing files here. @@ -721,7 +722,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> // This may still fail to remove temporary outputs of some failed bundles, but at least // the common case (where all bundles succeed) is guaranteed to be fully addressed. - Set<ResourceId> matches = new HashSet<>(); + Set<ResourceId> allMatches = new HashSet<>(knownFiles); + for (ResourceId match : allMatches) { + LOG.info("Will remove known temporary file {}", match); + } // TODO: Windows OS cannot resolves and matches '*' in the path, // ignore the exception for now to avoid failing the pipeline. if (shouldRemoveTemporaryDirectory) { @@ -730,28 +734,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> Iterables.getOnlyElement( FileSystems.match(Collections.singletonList(tempDir.toString() + "*"))); for (Metadata matchResult : singleMatch.metadata()) { - matches.add(matchResult.resourceId()); + if (allMatches.add(matchResult.resourceId())) { + LOG.info("Will also remove unknown temporary file {}", matchResult.resourceId()); + } } } catch (Exception e) { LOG.warn("Failed to match temporary files under: [{}].", tempDir); } } - Set<ResourceId> allMatches = new HashSet<>(matches); - allMatches.addAll(knownFiles); - LOG.debug( - "Removing {} temporary files found under {} ({} matched glob, {} known files)", - allMatches.size(), - tempDir, - matches.size(), - allMatches.size() - matches.size()); FileSystems.delete(allMatches, StandardMoveOptions.IGNORE_MISSING_FILES); - // Deletion of the temporary directory might fail, if not all temporary files are removed. - try { - FileSystems.delete( - Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES); - } catch (Exception e) { - LOG.warn("Failed to remove temporary directory: [{}].", tempDir); + if (shouldRemoveTemporaryDirectory) { + // Deletion of the temporary directory might fail, if not all temporary files are removed. + try { + FileSystems.delete( + Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES); + } catch (Exception e) { + LOG.warn("Failed to remove temporary directory: [{}].", tempDir); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 72ce5d0..f384dd5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -25,6 +25,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Objects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -32,6 +33,7 @@ import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,6 +64,8 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -181,12 +185,13 @@ public class WriteFiles<UserT, DestinationT, OutputT> checkArgument(windowedWrites, "Must use windowed writes when applying %s to an unbounded PCollection", WriteFiles.class.getSimpleName()); + } + if (windowedWrites) { // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 // and similar behavior in other runners. checkArgument( computeNumShards != null || numShardsProvider != null, - "When applying %s to an unbounded PCollection, " - + "must specify number of output shards explicitly", + "When using windowed writes, must specify number of output shards explicitly", WriteFiles.class.getSimpleName()); } this.writeOperation = sink.createWriteOperation(); @@ -502,15 +507,16 @@ public class WriteFiles<UserT, DestinationT, OutputT> if (writer == null) { LOG.debug("Opening writer for write operation {}", writeOperation); writer = writeOperation.createWriter(); + int shardNumber = + shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING + ? c.element().getKey().getShardNumber() + : UNKNOWN_SHARDNUM; if (windowedWrites) { - int shardNumber = - shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING - ? c.element().getKey().getShardNumber() - : UNKNOWN_SHARDNUM; writer.openWindowed( UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination); } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); + writer.openUnwindowed( + UUID.randomUUID().toString(), shardNumber, destination); } LOG.debug("Done opening writer"); writers.put(destination, writer); @@ -532,7 +538,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> throw e; } } - } + } @Override public void populateDisplayData(DisplayData.Builder builder) { @@ -615,13 +621,15 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults( - Iterable<FileResult<DestinationT>> results) { - Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create(); + private static <DestinationT> + Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> + groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) { + Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res = + ArrayListMultimap.create(); for (FileResult<DestinationT> result : results) { - perDestination.put(result.getDestination(), result); + res.put(KV.of(result.getDestination(), result.getWindow()), result); } - return perDestination; + return res; } /** @@ -752,51 +760,28 @@ public class WriteFiles<UserT, DestinationT, OutputT> PCollection<KV<DestinationT, String>> outputFilenames; if (windowedWrites) { - // When processing streaming windowed writes, results will arrive multiple times. This - // means we can't share the below implementation that turns the results into a side input, - // as new data arriving into a side input does not trigger the listening DoFn. Instead - // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered - // whenever new data arrives. - PCollection<KV<Void, FileResult<DestinationT>>> keyedResults = - results.apply( - "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null)); - keyedResults.setCoder( - KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder))); - - // Is the continuation trigger sufficient? + // We need to materialize the FileResult's before the renaming stage: this can be done either + // via a side input or via a GBK. However, when processing streaming windowed writes, results + // will arrive multiple times. This means we can't share the below implementation that turns + // the results into a side input, as new data arriving into a side input does not trigger the + // listening DoFn. We also can't use a GBK because we need only the materialization, but not + // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK + // would give. So, we use a reshuffle (over a single key to maximize bundling). outputFilenames = - keyedResults - .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create()) + results + .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder())) + .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of()) + .apply(Values.<FileResult<DestinationT>>create()) .apply( "FinalizeWindowed", ParDo.of( - new DoFn< - KV<Void, Iterable<FileResult<DestinationT>>>, - KV<DestinationT, String>>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Set<ResourceId> tempFiles = Sets.newHashSet(); - Multimap<DestinationT, FileResult<DestinationT>> results = - perDestinationResults(c.element().getValue()); - for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry : - results.asMap().entrySet()) { - LOG.info( - "Finalizing write operation {} for destination {} num shards: {}.", - writeOperation, - entry.getKey(), - entry.getValue().size()); - Map<ResourceId, ResourceId> finalizeMap = - writeOperation.finalize(entry.getValue()); - tempFiles.addAll(finalizeMap.keySet()); - for (ResourceId outputFile : finalizeMap.values()) { - c.output(KV.of(entry.getKey(), outputFile.toString())); - } - LOG.debug("Done finalizing write operation for {}.", entry.getKey()); - } - writeOperation.removeTemporaryFiles(tempFiles); - LOG.debug("Removed temporary files for {}.", writeOperation); - } - })) + new FinalizeWindowedFn<DestinationT>( + numShardsView, numShardsProvider, writeOperation)) + .withSideInputs( + numShardsView == null + ? ImmutableList.<PCollectionView<?>>of() + : ImmutableList.of(numShardsView))) .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); } else { final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = @@ -817,58 +802,15 @@ public class WriteFiles<UserT, DestinationT, OutputT> // set numShards, then all shards will be written out as empty files. For this reason we // use a side input here. PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); - outputFilenames = singletonCollection.apply( - "FinalizeUnwindowed", - ParDo.of( - new DoFn<Void, KV<DestinationT, String>>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); - // We must always output at least 1 shard, and honor user-specified numShards - // if set. - int minShardsNeeded; - if (numShardsView != null) { - minShardsNeeded = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - minShardsNeeded = numShardsProvider.get(); - } else { - minShardsNeeded = 1; - } - Set<ResourceId> tempFiles = Sets.newHashSet(); - Multimap<DestinationT, FileResult<DestinationT>> perDestination = - perDestinationResults(c.sideInput(resultsView)); - for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry : - perDestination.asMap().entrySet()) { - Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap(); - finalizeMap.putAll( - finalizeForDestinationFillEmptyShards( - entry.getKey(), entry.getValue(), minShardsNeeded)); - tempFiles.addAll(finalizeMap.keySet()); - for (ResourceId outputFile :finalizeMap.values()) { - c.output(KV.of(entry.getKey(), outputFile.toString())); - } - } - if (perDestination.isEmpty()) { - // If there is no input at all, write empty files to the default - // destination. - Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap(); - DestinationT destination = - getSink().getDynamicDestinations().getDefaultDestination(); - finalizeMap.putAll( - finalizeForDestinationFillEmptyShards( - destination, - Lists.<FileResult<DestinationT>>newArrayList(), - minShardsNeeded)); - tempFiles.addAll(finalizeMap.keySet()); - for (ResourceId outputFile :finalizeMap.values()) { - c.output(KV.of(destination, outputFile.toString())); - } - } - writeOperation.removeTemporaryFiles(tempFiles); - } - }) - .withSideInputs(finalizeSideInputs.build())) - .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); + outputFilenames = + singletonCollection + .apply( + "FinalizeUnwindowed", + ParDo.of( + new FinalizeUnwindowedFn<>( + numShardsView, numShardsProvider, resultsView, writeOperation)) + .withSideInputs(finalizeSideInputs.build())) + .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); } TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = @@ -879,41 +821,196 @@ public class WriteFiles<UserT, DestinationT, OutputT> outputFilenames); } - /** - * Finalize a list of files for a single destination. If a minimum number of shards is needed, - * this function will generate empty files for this destination to ensure that all shards are - * generated. - */ - private Map<ResourceId, ResourceId> finalizeForDestinationFillEmptyShards( - DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded) - throws Exception { - checkState(!windowedWrites); - - LOG.info( - "Finalizing write operation {} for destination {} num shards {}.", - writeOperation, - destination, - results.size()); - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { + private static class FinalizeWindowedFn<DestinationT> + extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> { + @Nullable private final PCollectionView<Integer> numShardsView; + @Nullable private final ValueProvider<Integer> numShardsProvider; + private final WriteOperation<DestinationT, ?> writeOperation; + + @Nullable private transient List<FileResult<DestinationT>> fileResults; + @Nullable private Integer fixedNumShards; + + public FinalizeWindowedFn( + @Nullable PCollectionView<Integer> numShardsView, + @Nullable ValueProvider<Integer> numShardsProvider, + WriteOperation<DestinationT, ?> writeOperation) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + this.writeOperation = writeOperation; + } + + @StartBundle + public void startBundle() { + fileResults = Lists.newArrayList(); + fixedNumShards = null; + } + + @ProcessElement + public void processElement(ProcessContext c) { + fileResults.add(c.element()); + if (fixedNumShards == null) { + if (numShardsView != null) { + fixedNumShards = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + fixedNumShards = numShardsProvider.get(); + } else { + fixedNumShards = null; + } + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + Set<ResourceId> tempFiles = Sets.newHashSet(); + Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results = + groupByDestinationAndWindow(fileResults); + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); + for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> + destEntry : results.asMap().entrySet()) { + DestinationT destination = destEntry.getKey().getKey(); + BoundedWindow window = destEntry.getKey().getValue(); + resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames( + destination, window, fixedNumShards, destEntry.getValue())); + } + LOG.info("Will finalize {} files", resultsToFinalFilenames.size()); + for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { + FileResult<DestinationT> res = entry.getKey(); + tempFiles.add(res.getTempFilename()); + c.output( + KV.of(res.getDestination(), entry.getValue().toString()), + res.getWindow().maxTimestamp(), + res.getWindow()); + } + writeOperation.copyToOutputFiles(resultsToFinalFilenames); + writeOperation.removeTemporaryFiles(tempFiles); + } + } + + private static class FinalizeUnwindowedFn<DestinationT> + extends DoFn<Void, KV<DestinationT, String>> { + @Nullable private final PCollectionView<Integer> numShardsView; + @Nullable private final ValueProvider<Integer> numShardsProvider; + private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView; + private final WriteOperation<DestinationT, ?> writeOperation; + + public FinalizeUnwindowedFn( + @Nullable PCollectionView<Integer> numShardsView, + @Nullable ValueProvider<Integer> numShardsProvider, + PCollectionView<Iterable<FileResult<DestinationT>>> resultsView, + WriteOperation<DestinationT, ?> writeOperation) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + this.resultsView = resultsView; + this.writeOperation = writeOperation; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); + @Nullable Integer fixedNumShards; + if (numShardsView != null) { + fixedNumShards = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + fixedNumShards = numShardsProvider.get(); + } else { + fixedNumShards = null; + } + Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap = + ArrayListMultimap.create(); + for (FileResult<DestinationT> result : c.sideInput(resultsView)) { + resultsByDestMultimap.put(result.getDestination(), result); + } + Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest = + resultsByDestMultimap.asMap(); + if (resultsByDest.isEmpty()) { + Collection<FileResult<DestinationT>> empty = ImmutableList.of(); + resultsByDest = + Collections.singletonMap( + writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty); + } + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); + for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> + destEntry : resultsByDest.entrySet()) { + resultsToFinalFilenames.addAll( + finalizeForDestinationFillEmptyShards( + destEntry.getKey(), fixedNumShards, destEntry.getValue())); + } + Set<ResourceId> tempFiles = Sets.newHashSet(); + for (KV<FileResult<DestinationT>, ResourceId> entry : + resultsToFinalFilenames) { + tempFiles.add(entry.getKey().getTempFilename()); + c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString())); + } + writeOperation.copyToOutputFiles(resultsToFinalFilenames); + writeOperation.removeTemporaryFiles(tempFiles); + } + + /** + * Finalize a list of files for a single destination. If a minimum number of shards is needed, + * this function will generate empty files for this destination to ensure that all shards are + * generated. + */ + private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards( + DestinationT destination, + @Nullable Integer fixedNumShards, + Collection<FileResult<DestinationT>> existingResults) + throws Exception { + checkState(!writeOperation.windowedWrites); + LOG.info( - "Creating {} empty output shards in addition to {} written " - + "for a total of {} for destination {}.", - extraShardsNeeded, - results.size(), - minShardsNeeded, - destination); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<DestinationT, OutputT> writer = writeOperation.createWriter(); - // Currently this code path is only called in the unwindowed case. - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); - FileResult<DestinationT> emptyWrite = writer.close(); - results.add(emptyWrite); + "Finalizing write operation {} for destination {} num shards {}.", + writeOperation, + destination, + existingResults.size()); + if (fixedNumShards != null) { + checkArgument( + existingResults.size() <= fixedNumShards, + "Fixed sharding into %s shards was specified, but got %s file results", + fixedNumShards, + existingResults.size()); + } + // We must always output at least 1 shard, and honor user-specified numShards + // if set. + Set<Integer> missingShardNums; + if (fixedNumShards == null) { + missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM); + } else { + missingShardNums = Sets.newHashSet(); + for (int i = 0; i < fixedNumShards; ++i) { + missingShardNums.add(i); + } + for (FileResult<DestinationT> res : existingResults) { + checkArgument( + res.getShard() != UNKNOWN_SHARDNUM, + "Fixed sharding into %s shards was specified, " + + "but file result %s does not specify a shard", + fixedNumShards, + res); + missingShardNums.remove(res.getShard()); + } + } + List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults); + if (!missingShardNums.isEmpty()) { + LOG.info( + "Creating {} empty output shards in addition to {} written for destination {}.", + missingShardNums.size(), + existingResults.size(), + destination); + for (int shard : missingShardNums) { + Writer<DestinationT, ?> writer = writeOperation.createWriter(); + // Currently this code path is only called in the unwindowed case. + writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination); + FileResult<DestinationT> emptyWrite = writer.close(); + completeResults.add(emptyWrite); + } + LOG.debug("Done creating extra shards for {}.", destination); } - LOG.debug("Done creating extra shards for {}.", destination); + return + writeOperation.buildOutputFilenames( + destination, + null, + (fixedNumShards == null) ? null : completeResults.size(), + completeResults); } - Map<ResourceId, ResourceId> finalizeMap = writeOperation.finalize(results); - LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination); - return finalizeMap; } } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 0a96b7e..29f3c1b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -25,6 +27,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -41,9 +44,8 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileResult; @@ -52,6 +54,7 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.values.KV; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.junit.Rule; @@ -97,7 +100,7 @@ public class FileBasedSinkTest { expected.addAll(values); expected.add(SimpleSink.SimpleWriter.FOOTER); - SimpleSink.SimpleWriter writer = + SimpleSink.SimpleWriter<Void> writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); writer.openUnwindowed(testUid, -1, null); for (String value : values) { @@ -186,7 +189,7 @@ public class FileBasedSinkTest { } /** Finalize and verify that files are copied and temporary files are optionally removed. */ - private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles) + private void runFinalize(SimpleSink.SimpleWriteOperation<Void> writeOp, List<File> temporaryFiles) throws Exception { int numFiles = temporaryFiles.size(); @@ -196,13 +199,21 @@ public class FileBasedSinkTest { fileResults.add( new FileResult<Void>( LocalResources.fromFile(temporaryFiles.get(i), false), - WriteFiles.UNKNOWN_SHARDNUM, + UNKNOWN_SHARDNUM, null, null, null)); } - writeOp.removeTemporaryFiles(writeOp.finalize(fileResults).keySet()); + // TODO: test with null first argument? + List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = + writeOp.buildOutputFilenames(null, null, null, fileResults); + Set<ResourceId> tempFiles = Sets.newHashSet(); + for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) { + tempFiles.add(res.getKey().getTempFilename()); + } + writeOp.copyToOutputFiles(resultsToFinalFilenames); + writeOp.removeTemporaryFiles(tempFiles); for (int i = 0; i < numFiles; i++) { ResourceId outputFilename = @@ -263,14 +274,14 @@ public class FileBasedSinkTest { /** Output files are copied to the destination location with the correct names and contents. */ @Test public void testCopyToOutputFiles() throws Exception { - SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); + SimpleSink.SimpleWriteOperation<Void> writeOp = buildWriteOperation(); List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); List<String> inputContents = Arrays.asList("1", "2", "3"); List<String> expectedOutputFilenames = Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test"); - Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>(); - List<ResourceId> expectedOutputPaths = new ArrayList<>(); + List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); + List<ResourceId> expectedOutputPaths = Lists.newArrayList(); for (int i = 0; i < inputFilenames.size(); i++) { // Generate output paths. @@ -282,17 +293,20 @@ public class FileBasedSinkTest { File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i)); List<String> lines = Collections.singletonList(inputContents.get(i)); writeFile(lines, inputTmpFile); - inputFilePaths.put( - LocalResources.fromFile(inputTmpFile, false), - writeOp - .getSink() - .getDynamicDestinations() - .getFilenamePolicy(null) - .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED)); + ResourceId finalFilename = writeOp + .getSink() + .getDynamicDestinations() + .getFilenamePolicy(null) + .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED); + resultsToFinalFilenames.add( + KV.of( + new FileResult<Void>( + LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null), + finalFilename)); } // Copy input files to output files. - writeOp.copyToOutputFiles(inputFilePaths); + writeOp.copyToOutputFiles(resultsToFinalFilenames); // Assert that the contents were copied. for (int i = 0; i < expectedOutputPaths.size(); i++) { @@ -302,7 +316,7 @@ public class FileBasedSinkTest { } public List<ResourceId> generateDestinationFilenames( - ResourceId outputDirectory, FilenamePolicy policy, int numFiles) { + FilenamePolicy policy, int numFiles) { List<ResourceId> filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED)); @@ -327,17 +341,17 @@ public class FileBasedSinkTest { root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE), root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)); - actual = generateDestinationFilenames(root, policy, 3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Collections.singletonList( root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)); - actual = generateDestinationFilenames(root, policy, 1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = generateDestinationFilenames(root, policy, 0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); } @@ -352,18 +366,19 @@ public class FileBasedSinkTest { ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE); ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE); - ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { Iterable<FileResult<Void>> results = Lists.newArrayList( - new FileResult<Void>(temp1, 1, null, null, null), - new FileResult<Void>(temp2, 1, null, null, null), - new FileResult<Void>(temp3, 1, null, null, null)); - writeOp.buildOutputFilenames(results); + new FileResult<Void>(temp1, 1 /* shard */, null, null, null), + new FileResult<Void>(temp2, 1 /* shard */, null, null, null), + new FileResult<Void>(temp3, 1 /* shard */, null, null, null)); + writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results); fail("Should have failed."); - } catch (IllegalStateException exn) { - assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage()); + } catch (IllegalArgumentException exn) { + assertThat(exn.getMessage(), containsString("generated the same name")); + assertThat(exn.getMessage(), containsString("temp1")); + assertThat(exn.getMessage(), containsString("temp2")); } } @@ -383,17 +398,17 @@ public class FileBasedSinkTest { root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE), root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)); - actual = generateDestinationFilenames(root, policy, 3); + actual = generateDestinationFilenames(policy, 3); assertEquals(expected, actual); expected = Collections.singletonList( root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)); - actual = generateDestinationFilenames(root, policy, 1); + actual = generateDestinationFilenames(policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = generateDestinationFilenames(root, policy, 0); + actual = generateDestinationFilenames(policy, 0); assertEquals(expected, actual); } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index 0f40067..1ade0d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -38,12 +38,8 @@ import com.google.common.collect.Lists; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -68,50 +64,28 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; /** Tests for {@link TextIO.Write}. */ public class TextIOWriteTest { private static final String MY_HEADER = "myHeader"; private static final String MY_FOOTER = "myFooter"; - private static Path tempFolder; + @Rule public transient TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule public TestPipeline p = TestPipeline.create(); + @Rule public transient TestPipeline p = TestPipeline.create(); - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @BeforeClass - public static void setupClass() throws IOException { - tempFolder = Files.createTempDirectory("TextIOTest"); - } - - @AfterClass - public static void teardownClass() throws IOException { - Files.walkFileTree( - tempFolder, - new SimpleFileVisitor<Path>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - }); - } + @Rule public transient ExpectedException expectedException = ExpectedException.none(); static class TestDynamicDestinations extends FileBasedSink.DynamicDestinations<String, String, String> { @@ -174,7 +148,9 @@ public class TextIOWriteTest { public void testDynamicDestinations() throws Exception { ResourceId baseDir = FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); + Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations") + .toString(), + true); List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab"); PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of())); @@ -262,7 +238,9 @@ public class TextIOWriteTest { public void testDynamicDefaultFilenamePolicy() throws Exception { ResourceId baseDir = FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true); + Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations") + .toString(), + true); List<UserWriteType> elements = Lists.newArrayList( @@ -371,7 +349,7 @@ public class TextIOWriteTest { private void runTestWrite(String[] elems, String header, String footer, int numShards) throws Exception { String outputName = "file.txt"; - Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); + Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite"); ResourceId baseFilename = FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); @@ -544,7 +522,7 @@ public class TextIOWriteTest { String outputName = "file.txt"; ResourceId baseDir = FileSystems.matchNewResource( - Files.createTempDirectory(tempFolder, "testwrite").toString(), true); + Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true); PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)); @@ -640,4 +618,34 @@ public class TextIOWriteTest { p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput())); } + + @Test + @Category(NeedsRunner.class) + public void testWindowedWritesWithOnceTrigger() throws Throwable { + // Tests for https://issues.apache.org/jira/browse/BEAM-3169 + PCollection<String> data = + p.apply(Create.of("0", "1", "2")) + .apply( + Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) + // According to this trigger, all data should be written. + // However, the continuation of this trigger is elementCountAtLeast(1), + // so with a buggy implementation that used a GBK before renaming files, + // only 1 file would be renamed. + .triggering(AfterPane.elementCountAtLeast(3)) + .withAllowedLateness(Duration.standardMinutes(1)) + .discardingFiredPanes()); + PCollection<String> filenames = + data.apply( + TextIO.write() + .to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath()) + .withNumShards(2) + .withWindowedWrites() + .<Void>withOutputFilenames()) + .getPerDestinationOutputFilenames() + .apply(Values.<String>create()); + + PAssert.that(filenames.apply(TextIO.readAll())).containsInAnyOrder("0", "1", "2"); + + p.run(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index e0f7b39..40ae0ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -21,11 +21,14 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -35,7 +38,6 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.nio.file.Paths; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; @@ -83,6 +85,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.commons.compress.utils.Sets; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -156,10 +159,6 @@ public class WriteFilesTest { } } - private String appendToTempFolder(String filename) { - return Paths.get(tmpFolder.getRoot().getPath(), filename).toString(); - } - private String getBaseOutputFilename() { return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString(); } @@ -187,7 +186,11 @@ public class WriteFilesTest { IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); - checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); + checkFileContents( + getBaseOutputFilename(), + Collections.<String>emptyList(), + Optional.of(1), + true /* expectRemovedTempDirectory */); } /** @@ -241,7 +244,8 @@ public class WriteFilesTest { p.run(); - checkFileContents(getBaseOutputFilename(), inputs, Optional.of(3)); + checkFileContents( + getBaseOutputFilename(), inputs, Optional.of(3), true /* expectRemovedTempDirectory */); } /** @@ -314,7 +318,10 @@ public class WriteFilesTest { inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))), getBaseOutputFilename(), - WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites()); + WriteFiles.to(makeSimpleSink()) + .withMaxNumWritersPerBundle(2) + .withWindowedWrites() + .withNumShards(1)); } public void testBuildWrite() { @@ -379,11 +386,10 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) - public void testUnboundedNeedsSharding() { + public void testWindowedWritesNeedSharding() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( - "When applying WriteFiles to an unbounded PCollection, " - + "must specify number of output shards explicitly"); + "When using windowed writes, must specify number of output shards explicitly"); SimpleSink<Void> sink = makeSimpleSink(); p.apply(Create.of("foo")) @@ -491,7 +497,11 @@ public class WriteFilesTest { for (int j = i; j < numInputs; j += 5) { expected.add("record_" + j); } - checkFileContents(base.toString(), expected, Optional.of(numShards)); + checkFileContents( + base.toString(), + expected, + Optional.of(numShards), + bounded /* expectRemovedTempDirectory */); } } @@ -659,14 +669,15 @@ public class WriteFilesTest { p.run(); Optional<Integer> numShards = - (write.getNumShards() != null) + (write.getNumShards() != null && !write.isWindowedWrites()) ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent(); - checkFileContents(baseName, inputs, numShards); + checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites()); } static void checkFileContents( - String baseName, List<String> inputs, Optional<Integer> numExpectedShards) + String baseName, List<String> inputs, Optional<Integer> numExpectedShards, + boolean expectRemovedTempDirectory) throws IOException { List<File> outputFiles = Lists.newArrayList(); final String pattern = baseName + "*"; @@ -675,6 +686,7 @@ public class WriteFilesTest { for (Metadata meta : metadata) { outputFiles.add(new File(meta.resourceId().toString())); } + assertFalse("Should have produced at least 1 output file", outputFiles.isEmpty()); if (numExpectedShards.isPresent()) { assertEquals(numExpectedShards.get().intValue(), outputFiles.size()); Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}"); @@ -710,6 +722,11 @@ public class WriteFilesTest { } } assertThat(actual, containsInAnyOrder(inputs.toArray())); + if (expectRemovedTempDirectory) { + assertThat( + Lists.newArrayList(new File(baseName).getParentFile().list()), + Matchers.everyItem(not(containsString(FileBasedSink.TEMP_DIRECTORY_PREFIX)))); + } } /** Options for test, exposed for PipelineOptionsFactory. */
