Repository: beam Updated Branches: refs/heads/master 47273b969 -> 1f1df2722
Fixes sharding and cleanup for dynamic file writes - Sharding was applied across all the destinations at the same time, instead of having each destination produce the requested number of shards - Fixes a bunch of issues in cleanup of temporary files in case of multiple destinations Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca406636 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca406636 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca406636 Branch: refs/heads/master Commit: ca4066366bca9ed5ec97859e269875573af7adfc Parents: 47273b9 Author: Reuven Lax <[email protected]> Authored: Tue Jul 11 19:19:42 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Sat Jul 15 10:56:54 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/DefaultFilenamePolicy.java | 27 +++++ .../org/apache/beam/sdk/io/FileBasedSink.java | 43 ++++--- .../java/org/apache/beam/sdk/io/WriteFiles.java | 117 ++++++++++++++----- .../apache/beam/sdk/io/FileBasedSinkTest.java | 2 +- .../org/apache/beam/sdk/io/WriteFilesTest.java | 66 +++++++++-- 5 files changed, 198 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 64d7edc..4021609 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.io; import static com.google.common.base.MoreObjects.firstNonNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -139,6 +141,31 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { public Params withSuffix(String suffix) { return new Params(baseFilename, shardTemplate, suffix, explicitTemplate); } + + @Override + public int hashCode() { + return Objects.hashCode(baseFilename.get(), shardTemplate, suffix); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Params)) { + return false; + } + Params other = (Params) o; + return baseFilename.get().equals(other.baseFilename.get()) + && shardTemplate.equals(other.shardTemplate) + && suffix.equals(other.suffix); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("baseFilename", baseFilename) + .add("shardTemplate", shardTemplate) + .add("suffix", suffix) + .toString(); + } } /** A Coder for {@link Params}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index c68b794..9953975 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -218,7 +218,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab implements HasDisplayData, Serializable { /** * Returns an object that represents at a high level the destination being written to. May not - * return null. + * return null. A destination must have deterministic hash and equality methods defined. */ public abstract DestinationT getDestination(UserT element); @@ -486,8 +486,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } /** - * Finalizes writing by copying temporary output files to their final location and optionally - * removing temporary files. + * Finalizes writing by copying temporary output files to their final location. * * <p>Finalization may be overridden by subclass implementations to perform customized * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code @@ -497,23 +496,37 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It * is a best practice to attempt to try to make this method atomic. * + * <p>Returns the set of temporary files generated. Callers must call {@link + * #removeTemporaryFiles(Set)} to cleanup these files. + * * @param writerResults the results of writes (FileResult). */ - public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception { - // Collect names of temporary files and rename them. + public Set<ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults) + throws Exception { + // Collect names of temporary files and copies them. Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults); copyToOutputFiles(outputFilenames); + return outputFilenames.keySet(); + } - // Optionally remove temporary files. - // We remove the entire temporary directory, rather than specifically removing the files - // from writerResults, because writerResults includes only successfully completed bundles, - // and we'd like to clean up the failed ones too. - // Note that due to GCS eventual consistency, matching files in the temp directory is also - // currently non-perfect and may fail to delete some files. - // - // When windows or triggers are specified, files are generated incrementally so deleting - // the entire directory in finalize is incorrect. - removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites); + /* + * Remove temporary files after finalization. + * + * <p>In the case where we are doing global-window, untriggered writes, we remove the entire + * temporary directory, rather than specifically removing the files from writerResults, because + * writerResults includes only successfully completed bundles, and we'd like to clean up the + * failed ones too. The reason we remove files here rather than in finalize is that finalize + * might be called multiple times (e.g. if the bundle contained multiple destinations), and + * deleting the entire directory can't be done until all calls to finalize. + * + * <p>When windows or triggers are specified, files are generated incrementally so deleting the + * entire directory in finalize is incorrect. If windowedWrites is true, we instead delete the + * files individually. This means that some temporary files generated by failed bundles might + * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions + * are thrown, however there are still some scenarios where temporary files might be left. + */ + public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException { + removeTemporaryFiles(filenames, !windowedWrites); } @Experimental(Kind.FILESYSTEM) http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 7013044..d8d7478 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -19,15 +19,21 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Objects; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -44,6 +50,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -594,6 +601,15 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } + Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults( + Iterable<FileResult<DestinationT>> results) { + Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create(); + for (FileResult<DestinationT> result : results) { + perDestination.put(result.getDestination(), result); + } + return perDestination; + } + /** * A write is performed as sequence of three {@link ParDo}'s. * @@ -737,11 +753,21 @@ public class WriteFiles<UserT, DestinationT, OutputT> new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { - LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult<DestinationT>> results = - Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation"); + Set<ResourceId> tempFiles = Sets.newHashSet(); + Multimap<DestinationT, FileResult<DestinationT>> results = + perDestinationResults(c.element().getValue()); + for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry : + results.asMap().entrySet()) { + LOG.info( + "Finalizing write operation {} for destination {} num shards: {}.", + writeOperation, + entry.getKey(), + entry.getValue().size()); + tempFiles.addAll(writeOperation.finalize(entry.getValue())); + LOG.debug("Done finalizing write operation for {}.", entry.getKey()); + } + writeOperation.removeTemporaryFiles(tempFiles); + LOG.debug("Removed temporary files for {}.", writeOperation); } })); } else { @@ -769,11 +795,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> @ProcessElement public void processElement(ProcessContext c) throws Exception { LOG.info("Finalizing write operation {}.", writeOperation); - List<FileResult<DestinationT>> results = - Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug( - "Side input initialized to finalize write operation {}.", writeOperation); - // We must always output at least 1 shard, and honor user-specified numShards // if // set. @@ -785,31 +806,67 @@ public class WriteFiles<UserT, DestinationT, OutputT> } else { minShardsNeeded = 1; } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written " - + "for a total of {}.", - extraShardsNeeded, - results.size(), - minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<OutputT, DestinationT> writer = writeOperation.createWriter(); - writer.openUnwindowed( - UUID.randomUUID().toString(), - UNKNOWN_SHARDNUM, - sink.getDynamicDestinations().getDefaultDestination()); - FileResult<DestinationT> emptyWrite = writer.close(); - results.add(emptyWrite); - } - LOG.debug("Done creating extra shards."); + Set<ResourceId> tempFiles = Sets.newHashSet(); + Multimap<DestinationT, FileResult<DestinationT>> perDestination = + perDestinationResults(c.sideInput(resultsView)); + for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry : + perDestination.asMap().entrySet()) { + tempFiles.addAll( + finalizeForDestinationFillEmptyShards( + entry.getKey(), entry.getValue(), minShardsNeeded)); } - writeOperation.finalize(results); - LOG.debug("Done finalizing write operation {}", writeOperation); + if (perDestination.isEmpty()) { + // If there is no input at all, write empty files to the default + // destination. + tempFiles.addAll( + finalizeForDestinationFillEmptyShards( + getSink().getDynamicDestinations().getDefaultDestination(), + Lists.<FileResult<DestinationT>>newArrayList(), + minShardsNeeded)); + } + writeOperation.removeTemporaryFiles(tempFiles); } }) .withSideInputs(sideInputs.build())); } return PDone.in(input.getPipeline()); } + + /** + * Finalize a list of files for a single destination. If a minimum number of shards is needed, + * this function will generate empty files for this destination to ensure that all shards are + * generated. + */ + private Set<ResourceId> finalizeForDestinationFillEmptyShards( + DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded) + throws Exception { + checkState(!windowedWrites); + + LOG.info( + "Finalizing write operation {} for destination {} num shards {}.", + writeOperation, + destination, + results.size()); + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written " + + "for a total of {} for destination {}.", + extraShardsNeeded, + results.size(), + minShardsNeeded, + destination); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer<OutputT, DestinationT> writer = writeOperation.createWriter(); + // Currently this code path is only called in the unwindowed case. + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination); + FileResult<DestinationT> emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards for {}.", destination); + } + Set<ResourceId> tempFiles = writeOperation.finalize(results); + LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination); + return tempFiles; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index b756778..a6ad746 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -203,7 +203,7 @@ public class FileBasedSinkTest { null)); } - writeOp.finalize(fileResults); + writeOp.removeTemporaryFiles(writeOp.finalize(fileResults)); for (int i = 0; i < numFiles; i++) { ResourceId outputFilename = http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 1ca7169..60088de 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -35,11 +36,15 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.nio.file.Paths; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; @@ -80,6 +85,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.commons.compress.utils.Sets; import org.joda.time.Duration; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -449,16 +455,23 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) public void testDynamicDestinationsBounded() throws Exception { - testDynamicDestinationsHelper(true); + testDynamicDestinationsHelper(true, false); } @Test @Category(NeedsRunner.class) public void testDynamicDestinationsUnbounded() throws Exception { - testDynamicDestinationsHelper(false); + testDynamicDestinationsHelper(false, false); } - private void testDynamicDestinationsHelper(boolean bounded) throws IOException { + @Test + @Category(NeedsRunner.class) + public void testDynamicDestinationsFillEmptyShards() throws Exception { + testDynamicDestinationsHelper(true, true); + } + + private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards) + throws IOException { TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory()); SimpleSink<Integer> sink = new SimpleSink<>( @@ -469,15 +482,21 @@ public class WriteFilesTest { options.setTestFlag("test_value"); Pipeline p = TestPipeline.create(options); - List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + final int numInputs = 100; + List<String> inputs = Lists.newArrayList(); + for (int i = 0; i < numInputs; ++i) { + inputs.add(Integer.toString(i)); + } // Prepare timestamps for the elements. List<Long> timestamps = new ArrayList<>(); for (long i = 0; i < inputs.size(); i++) { timestamps.add(i + 1); } - + // If emptyShards==true make numShards larger than the number of elements per destination. + // This will force every destination to generate some empty shards. + int numShards = emptyShards ? 2 * numInputs / 5 : 2; WriteFiles<String, Integer, String> writeFiles = - WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1); + WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards); PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps)); if (!bounded) { @@ -492,8 +511,11 @@ public class WriteFilesTest { for (int i = 0; i < 5; ++i) { ResourceId base = getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE); - List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5)); - checkFileContents(base.toString(), expected, Optional.of(1)); + List<String> expected = Lists.newArrayList(); + for (int j = i; j < numInputs; j += 5) { + expected.add("record_" + j); + } + checkFileContents(base.toString(), expected, Optional.of(numShards)); } } @@ -599,13 +621,14 @@ public class WriteFilesTest { BoundedWindow window, PaneInfo paneInfo, OutputFileHints outputFileHints) { + DecimalFormat df = new DecimalFormat("0000"); IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s%s", filenamePrefixForWindow(intervalWindow), - shardNumber, - numShards, + df.format(shardNumber), + df.format(numShards), outputFileHints.getSuggestedFilenameSuffix(), suffix); return baseFilename @@ -616,12 +639,17 @@ public class WriteFilesTest { @Override public ResourceId unwindowedFilename( int shardNumber, int numShards, OutputFileHints outputFileHints) { + DecimalFormat df = new DecimalFormat("0000"); String prefix = baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); String filename = String.format( "%s-%s-of-%s%s%s", - prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix); + prefix, + df.format(shardNumber), + df.format(numShards), + outputFileHints.getSuggestedFilenameSuffix(), + suffix); return baseFilename .getCurrentDirectory() .resolve(filename, StandardResolveOptions.RESOLVE_FILE); @@ -674,6 +702,22 @@ public class WriteFilesTest { } if (numExpectedShards.isPresent()) { assertEquals(numExpectedShards.get().intValue(), outputFiles.size()); + Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}"); + + Set<String> expectedShards = Sets.newHashSet(); + DecimalFormat df = new DecimalFormat("0000"); + for (int i = 0; i < numExpectedShards.get(); i++) { + expectedShards.add( + String.format("%s-of-%s", df.format(i), df.format(numExpectedShards.get()))); + } + + Set<String> outputShards = Sets.newHashSet(); + for (File file : outputFiles) { + Matcher matcher = shardPattern.matcher(file.getName()); + assertTrue(matcher.find()); + assertTrue(outputShards.add(matcher.group())); + } + assertEquals(expectedShards, outputShards); } List<String> actual = Lists.newArrayList();
