Repository: incubator-beam Updated Branches: refs/heads/master 61d8cf2c4 -> 39da22c76
FileBasedSink: Detect bad shard name templates This is particularly relevant when TextIO.Write.withoutSharding() is used: [BEAM-159]. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d036bc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d036bc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d036bc8 Branch: refs/heads/master Commit: 4d036bc8a4b8c255d09467d09fe4ed7eae9dd035 Parents: 61d8cf2 Author: Robert Bradshaw <[email protected]> Authored: Wed Jun 8 14:59:56 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Jun 9 09:47:24 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 16 +++++++++----- .../apache/beam/sdk/io/FileBasedSinkTest.java | 23 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 9048380..521f54b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.HashSet; import java.util.List; /** @@ -333,13 +334,10 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options) throws IOException { int numFiles = filenames.size(); - List<String> srcFilenames = new ArrayList<>(); + // Sort files for idempotence. + List<String> srcFilenames = Ordering.natural().sortedCopy(filenames); List<String> destFilenames = generateDestinationFilenames(numFiles); - // Sort files for copying. - srcFilenames.addAll(filenames); - Collections.sort(srcFilenames); - if (numFiles > 0) { LOG.debug("Copying {} files.", numFiles); FileOperations fileOperations = @@ -366,6 +364,12 @@ public abstract class FileBasedSink<T> extends Sink<T> { destFilenames.add(IOChannelUtils.constructName( baseOutputFilename, fileNamingTemplate, suffix, i, numFiles)); } + + int numDistinctShards = new HashSet<String>(destFilenames).size(); + Preconditions.checkState(numDistinctShards == numFiles, + "Shard name template '%s' only generated %s distinct file names for %s files.", + fileNamingTemplate, numDistinctShards, numFiles); + return destFilenames; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 0e434fc..d3454da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention; @@ -374,6 +375,28 @@ public class FileBasedSinkTest { } /** + * Reject non-distinct output filenames. + */ + @Test + public void testCollidingOutputFilenames() { + SimpleSink sink = new SimpleSink("output", "test", "-NN"); + SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); + + // A single shard doesn't need to include the shard number. + assertEquals(Arrays.asList("output-01.test"), + writeOp.generateDestinationFilenames(1)); + + // More than one shard does. + try { + writeOp.generateDestinationFilenames(3); + fail("Should have failed."); + } catch (IllegalStateException exn) { + assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.", + exn.getMessage()); + } + } + + /** * Output filenames are generated correctly when an extension is not supplied. */ @Test
