Repository: beam Updated Branches: refs/heads/master 6d64c6ec1 -> 88f78fa2f
[BEAM-2276] Cleanups on the windowed DefaultFilenamePolicy Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e764167f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e764167f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e764167f Branch: refs/heads/master Commit: e764167f40e603ac00ac80758cac0108bcc49769 Parents: 6d64c6e Author: Reuven Lax <[email protected]> Authored: Thu May 25 23:42:17 2017 -0700 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Tue Jun 6 11:08:35 2017 +0200 ---------------------------------------------------------------------- .../construction/PTransformMatchersTest.java | 8 +- .../direct/WriteWithShardingFactoryTest.java | 23 +++-- .../java/org/apache/beam/sdk/io/AvroIO.java | 2 +- .../beam/sdk/io/DefaultFilenamePolicy.java | 103 +++++-------------- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 2 +- .../java/org/apache/beam/sdk/io/TextIO.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 11 +- .../beam/sdk/io/DefaultFilenamePolicyTest.java | 26 ++--- .../java/org/apache/beam/sdk/io/TextIOTest.java | 5 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 2 +- 10 files changed, 69 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index cfea62f..2497598 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -504,8 +504,12 @@ public class PTransformMatchersTest implements Serializable { @Test public void writeWithRunnerDeterminedSharding() { ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */); - FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters( - StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, ""); + FilenamePolicy policy = + DefaultFilenamePolicy.constructUsingStandardParameters( + StaticValueProvider.of(outputDirectory), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, + "", + false); WriteFiles<Integer> write = WriteFiles.to( new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) { http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 5c4fea1..a88d95e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -129,15 +129,20 @@ public class WriteWithShardingFactoryTest { @Test public void withNoShardingSpecifiedReturnsNewTransform() { ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */); - FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters( - StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, ""); - WriteFiles<Object> original = WriteFiles.to( - new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) { - @Override - public WriteOperation<Object> createWriteOperation() { - throw new IllegalArgumentException("Should not be used"); - } - }); + FilenamePolicy policy = + DefaultFilenamePolicy.constructUsingStandardParameters( + StaticValueProvider.of(outputDirectory), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, + "", + false); + WriteFiles<Object> original = + WriteFiles.to( + new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) { + @Override + public WriteOperation<Object> createWriteOperation() { + throw new IllegalArgumentException("Should not be used"); + } + }); @SuppressWarnings("unchecked") PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6af0e79..4143db2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -451,7 +451,7 @@ public class AvroIO { FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix()); + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); } WriteFiles<T> write = WriteFiles.to( http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 5073854..f9e4ac4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class); /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ - public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The default windowed sharding name template used when writing windowed files. * This is used as default in cases when user did not specify shard template to @@ -63,27 +64,12 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * windowed and non-windowed file names. */ private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE = - "P-W" + DEFAULT_SHARD_TEMPLATE; - - /* - * pattern for only non-windowed file names - */ - private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+"; - - /* - * pattern for only windowed file names - */ - private static final String WINDOWED_ONLY_PATTERN = "P|W"; + "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE; /* * pattern for both windowed and non-windowed file names */ - private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|" - + WINDOWED_ONLY_PATTERN + ")"; - - // Pattern that matches shard placeholders within a shard template. - private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN); - private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN); + private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)"); /** * Constructs a new {@link DefaultFilenamePolicy}. @@ -104,19 +90,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * * <p>Any filename component of the provided resource will be used as the filename prefix. * - * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE} - * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will - * be used for windowed file names. + * <p>If provided, the shard name template will be used; otherwise + * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and + * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names. * * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix. */ public static DefaultFilenamePolicy constructUsingStandardParameters( ValueProvider<ResourceId> outputPrefix, @Nullable String shardTemplate, - @Nullable String filenameSuffix) { + @Nullable String filenameSuffix, + boolean windowedWrites) { + // Pick the appropriate default policy based on whether windowed writes are being performed. + String defaultTemplate = + windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE; return new DefaultFilenamePolicy( NestedValueProvider.of(outputPrefix, new ExtractFilename()), - firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE), + firstNonNull(shardTemplate, defaultTemplate), firstNonNull(filenameSuffix, "")); } @@ -124,19 +114,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { private final String shardTemplate; private final String suffix; - /* - * Checks whether given template contains enough information to form - * meaningful windowed file names - ie whether it uses pane and window - * info. - */ - static boolean isWindowedTemplate(String template){ - if (template != null){ - Matcher m = WINDOWED_FORMAT_RE.matcher(template); - return m.find(); - } - return false; - } - /** * Constructs a fully qualified name from components. * @@ -191,51 +168,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { return sb.toString(); } - static String constructName(String prefix, String shardTemplate, String suffix, int shardNum, - int numShards) { - return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null); - } - @Override @Nullable public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) { - String filename = - constructName( - prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards()) - + extension; + String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), + context.getNumShards(), null, null) + extension; return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { - - boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE); - - if (shardTemplateProvidedByUser){ - boolean isWindowed = isWindowedTemplate(this.shardTemplate); - if (!isWindowed){ - LOG.info("Template you provided {} does not have enough information to create" - + "meaningful windowed file names. Consider using P and W in your template", - this.shardTemplate); - } - } - final PaneInfo paneInfo = context.getPaneInfo(); String paneStr = paneInfoToString(paneInfo); String windowStr = windowToString(context.getWindow()); - - String templateToUse = shardTemplate; - if (!shardTemplateProvidedByUser){ - LOG.info("User did not provide shard template. For creating windowed file names " - + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE); - templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE; - } - - String filename = constructName(prefix.get(), templateToUse, suffix, - context.getShardNumber(), context.getNumShards(), paneStr, windowStr) - + extension; + String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(), + context.getNumShards(), paneStr, windowStr) + extension; return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @@ -248,18 +197,20 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { } if (window instanceof IntervalWindow) { IntervalWindow iw = (IntervalWindow) window; - return String.format("IntervalWindow-%s-%s", iw.start().toString(), - iw.end().toString()); + return String.format("%s-%s", iw.start().toString(), iw.end().toString()); } return window.toString(); } - private String paneInfoToString(PaneInfo paneInfo){ - long currentPaneIndex = (paneInfo == null ? -1L - : paneInfo.getIndex()); - boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst()); - boolean lastPane = (paneInfo == null ? false : paneInfo.isLast()); - return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane); + private String paneInfoToString(PaneInfo paneInfo) { + String paneString = String.format("pane-%d", paneInfo.getIndex()); + if (paneInfo.getTiming() == Timing.LATE) { + paneString = String.format("%s-late", paneString); + } + if (paneInfo.isLast()) { + paneString = String.format("%s-last", paneString); + } + return paneString; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index c274595..e288075 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -559,7 +559,7 @@ public class TFRecordIO { super( outputPrefix, DefaultFilenamePolicy.constructUsingStandardParameters( - outputPrefix, shardTemplate, suffix), + outputPrefix, shardTemplate, suffix, false), writableByteChannelFactory(compressionType)); } http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index afb5849..f1eb7c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -440,7 +440,7 @@ public class TextIO { FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( - getFilenamePrefix(), getShardTemplate(), getFilenameSuffix()); + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites()); } WriteFiles<String> write = WriteFiles.to( http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index d71f2f7..6d01d32 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -479,7 +479,8 @@ public class AvroIOTest { p.run(); String shardNameTemplate = - firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE); + firstNonNull(write.getShardTemplate(), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate); } @@ -493,7 +494,13 @@ public class AvroIOTest { expectedFiles.add( new File( DefaultFilenamePolicy.constructName( - outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards))); + outputFilePrefix, + shardNameTemplate, + "" /* no suffix */, + i, + numShards, + null, + null))); } List<String> actualElements = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index 787403b..217420c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -18,10 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; -import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,36 +33,25 @@ public class DefaultFilenamePolicyTest { @Test public void testConstructName() { assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123)); + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); assertEquals("out.txt/part-00042", - constructName("out.txt", "/part-SSSSS", "", 42, 100)); + constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null)); assertEquals("out.txt", - constructName("ou", "t.t", "xt", 1, 1)); + constructName("ou", "t.t", "xt", 1, 1, null, null)); assertEquals("out0102shard.txt", - constructName("out", "SSNNshard", ".txt", 1, 2)); + constructName("out", "SSNNshard", ".txt", 1, 2, null, null)); assertEquals("out-2/1.part-1-of-2.txt", - constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2)); + constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null)); } @Test public void testConstructNameWithLargeShardCount() { assertEquals("out-100-of-5000.txt", - constructName("out", "-SS-of-NN", ".txt", 100, 5000)); - } - - @Test - public void testIsWindowedTemplate(){ - assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W")); - assertTrue(isWindowedTemplate("-SSS-of-NNN-W")); - assertTrue(isWindowedTemplate("-SSS-of-NNN-P")); - assertTrue(isWindowedTemplate("W-SSS-of-NNN")); - - assertFalse(isWindowedTemplate("-SSS-of-NNN")); - assertFalse(isWindowedTemplate("-SSS-of-lp")); + constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 6c7a53f..9468893 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -312,7 +312,8 @@ public class TextIOTest { p.run(); assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, - firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE)); + firstNonNull(write.getShardTemplate(), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE)); } public static void assertOutputFiles( @@ -337,7 +338,7 @@ public class TextIOTest { new File( rootLocation.toString(), DefaultFilenamePolicy.constructName( - outputName, shardNameTemplate, "", i, numShards))); + outputName, shardNameTemplate, "", i, numShards, null, null))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 60075a7..6ae83f2 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -41,7 +41,7 @@ class XmlSink<T> extends FileBasedSink<T> { private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) { return DefaultFilenamePolicy.constructUsingStandardParameters( - spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION); + spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false); } XmlSink(XmlIO.Write<T> spec) {
