Repository: beam Updated Branches: refs/heads/master 9cd12907e -> deee5b3c2
[BEAM-2276] Add windowing into default filename policy Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e8c388a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e8c388a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e8c388a Branch: refs/heads/master Commit: 4e8c388a46636a8e5391517876ec1adb818b8d61 Parents: 9cd1290 Author: Borisa Zivkovic <[email protected]> Authored: Mon May 15 08:56:19 2017 +0100 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Fri May 26 07:35:07 2017 +0200 ---------------------------------------------------------------------- .../beam/sdk/io/DefaultFilenamePolicy.java | 151 +++++++++++++++++-- .../java/org/apache/beam/sdk/io/TextIO.java | 8 +- .../beam/sdk/io/DefaultFilenamePolicyTest.java | 57 +++++++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 10 -- 4 files changed, 196 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 07bc2db..5073854 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -32,22 +32,58 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A default {@link FilenamePolicy} for unwindowed files. This policy is constructed using three - * parameters that together define the output name of a sharded file, in conjunction with the number - * of shards and index of the particular file, using {@link #constructName}. + * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed + * using three parameters that together define the output name of a sharded file, in conjunction + * with the number of shards, index of the particular file, current window and pane information, + * using {@link #constructName}. * - * <p>Most users of unwindowed files will use this {@link DefaultFilenamePolicy}. For more advanced + * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced * uses in generating different files for each window and other sharding controls, see the * {@code WriteOneFilePerWindow} example pipeline. */ public final class DefaultFilenamePolicy extends FilenamePolicy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class); + /** The default sharding name template used in {@link #constructUsingStandardParameters}. */ public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + /** The default windowed sharding name template used when writing windowed files. + * This is used as default in cases when user did not specify shard template to + * be used and there is a need to write windowed files. In cases when user does + * specify shard template to be used then provided template will be used for both + * windowed and non-windowed file names. + */ + private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE = + "P-W" + DEFAULT_SHARD_TEMPLATE; + + /* + * pattern for only non-windowed file names + */ + private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+"; + + /* + * pattern for only windowed file names + */ + private static final String WINDOWED_ONLY_PATTERN = "P|W"; + + /* + * pattern for both windowed and non-windowed file names + */ + private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|" + + WINDOWED_ONLY_PATTERN + ")"; + // Pattern that matches shard placeholders within a shard template. - private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)"); + private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN); + private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN); /** * Constructs a new {@link DefaultFilenamePolicy}. @@ -69,7 +105,8 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * <p>Any filename component of the provided resource will be used as the filename prefix. * * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE} - * will be used. + * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will + * be used for windowed file names. * * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix. */ @@ -87,6 +124,19 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { private final String shardTemplate; private final String suffix; + /* + * Checks whether given template contains enough information to form + * meaningful windowed file names - ie whether it uses pane and window + * info. + */ + static boolean isWindowedTemplate(String template){ + if (template != null){ + Matcher m = WINDOWED_FORMAT_RE.matcher(template); + return m.find(); + } + return false; + } + /** * Constructs a fully qualified name from components. * @@ -95,29 +145,45 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * strings. * * <p>Within a shard template, repeating sequences of the letters "S" or "N" - * are replaced with the shard number, or number of shards respectively. The - * numbers are formatted with leading zeros to match the length of the + * are replaced with the shard number, or number of shards respectively. + * "P" is replaced with by stringification of current pane. + * "W" is replaced by stringification of current window. + * + * <p>The numbers are formatted with leading zeros to match the length of the * repeated sequence of letters. * * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is * produced: "output-001-of-100.txt". */ - public static String constructName( - String prefix, String shardTemplate, String suffix, int shardNum, int numShards) { + static String constructName( + String prefix, String shardTemplate, String suffix, int shardNum, int numShards, + String paneStr, String windowStr) { // Matcher API works with StringBuffer, rather than StringBuilder. StringBuffer sb = new StringBuffer(); sb.append(prefix); Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate); while (m.find()) { - boolean isShardNum = (m.group(1).charAt(0) == 'S'); + boolean isCurrentShardNum = (m.group(1).charAt(0) == 'S'); + boolean isNumberOfShards = (m.group(1).charAt(0) == 'N'); + boolean isPane = (m.group(1).charAt(0) == 'P') && paneStr != null; + boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null; char[] zeros = new char[m.end() - m.start()]; Arrays.fill(zeros, '0'); DecimalFormat df = new DecimalFormat(String.valueOf(zeros)); - String formatted = df.format(isShardNum ? shardNum : numShards); - m.appendReplacement(sb, formatted); + if (isCurrentShardNum) { + String formatted = df.format(shardNum); + m.appendReplacement(sb, formatted); + } else if (isNumberOfShards) { + String formatted = df.format(numShards); + m.appendReplacement(sb, formatted); + } else if (isPane) { + m.appendReplacement(sb, paneStr); + } else if (isWindow) { + m.appendReplacement(sb, windowStr); + } } m.appendTail(sb); @@ -125,6 +191,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { return sb.toString(); } + static String constructName(String prefix, String shardTemplate, String suffix, int shardNum, + int numShards) { + return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null); + } + @Override @Nullable public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, @@ -138,9 +209,57 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { @Override public ResourceId windowedFilename(ResourceId outputDirectory, - WindowedContext c, String extension) { - throw new UnsupportedOperationException("There is no default policy for windowed file" - + " output. Please provide an explicit FilenamePolicy to generate filenames."); + WindowedContext context, String extension) { + + boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE); + + if (shardTemplateProvidedByUser){ + boolean isWindowed = isWindowedTemplate(this.shardTemplate); + if (!isWindowed){ + LOG.info("Template you provided {} does not have enough information to create" + + "meaningful windowed file names. Consider using P and W in your template", + this.shardTemplate); + } + } + + final PaneInfo paneInfo = context.getPaneInfo(); + String paneStr = paneInfoToString(paneInfo); + String windowStr = windowToString(context.getWindow()); + + String templateToUse = shardTemplate; + if (!shardTemplateProvidedByUser){ + LOG.info("User did not provide shard template. For creating windowed file names " + + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE); + templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE; + } + + String filename = constructName(prefix.get(), templateToUse, suffix, + context.getShardNumber(), context.getNumShards(), paneStr, windowStr) + + extension; + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + } + + /* + * Since not all windows have toString() that is nice or is compatible to be a part of file name. + */ + private String windowToString(BoundedWindow window) { + if (window instanceof GlobalWindow) { + return "GlobalWindow"; + } + if (window instanceof IntervalWindow) { + IntervalWindow iw = (IntervalWindow) window; + return String.format("IntervalWindow-%s-%s", iw.start().toString(), + iw.end().toString()); + } + return window.toString(); + } + + private String paneInfoToString(PaneInfo paneInfo){ + long currentPaneIndex = (paneInfo == null ? -1L + : paneInfo.getIndex()); + boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst()); + boolean lastPane = (paneInfo == null ? false : paneInfo.isLast()); + return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 5c068ce..afb5849 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -70,8 +70,10 @@ import org.apache.beam.sdk.values.PDone; * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be * preserved. When producing windowed writes, the number of output shards must be set explicitly * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a - * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be - * set, and unique windows and triggers must produce unique filenames. + * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be + * set in case you need better control over naming files created by unique windows. + * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate + * for your use case. * * <p>Any existing files with the same names as generated output files will be overwritten. * @@ -434,8 +436,6 @@ public class TextIO { (getFilenamePolicy() == null) || (getShardTemplate() == null && getFilenameSuffix() == null), "Cannot set a filename policy and also a filename template or suffix."); - checkState(!getWindowedWrites() || (getFilenamePolicy() != null), - "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index c895da8..787403b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; +import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.junit.Test; import org.junit.runner.RunWith; @@ -29,6 +32,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DefaultFilenamePolicyTest { + @Test public void testConstructName() { assertEquals("output-001-of-123.txt", @@ -52,4 +56,57 @@ public class DefaultFilenamePolicyTest { assertEquals("out-100-of-5000.txt", constructName("out", "-SS-of-NN", ".txt", 100, 5000)); } + + @Test + public void testIsWindowedTemplate(){ + assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-W")); + assertTrue(isWindowedTemplate("-SSS-of-NNN-P")); + assertTrue(isWindowedTemplate("W-SSS-of-NNN")); + + assertFalse(isWindowedTemplate("-SSS-of-NNN")); + assertFalse(isWindowedTemplate("-SSS-of-lp")); + } + + @Test + public void testConstructWindowedName() { + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); + + assertEquals("output-001-of-123-PPP-W.txt", + constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); + + assertEquals("out.txt/part-00042-myPaneStr-myWindowStr", + constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", + "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2", + "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt", + constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", + "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" + + "panemyPaneStr3.txt", + constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3", + "slidingWindow1")); + + // test first/last pane + assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false", + constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", + "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane", + "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", + constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", + "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", + constructName("out", + "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 0d8fbbd..6c7a53f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -1103,15 +1103,5 @@ public class TextIOTest { SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } - @Test - public void testWindowedWriteRequiresFilenamePolicy() { - PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); - TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites(); - - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); - emptyInput.apply(write); - } }
