Repository: beam Updated Branches: refs/heads/master 1babed250 -> 0c740f436
[BEAM-59] Move IOChannelUtils.constructName to FileBasedSink Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/235a79a3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/235a79a3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/235a79a3 Branch: refs/heads/master Commit: 235a79a314336e8876bf3f50e5efc952270b8404 Parents: 1babed2 Author: Dan Halperin <[email protected]> Authored: Mon Apr 24 15:30:56 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Apr 27 10:24:48 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 46 +++++++++++++++++- .../apache/beam/sdk/util/IOChannelUtils.java | 50 ++------------------ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../apache/beam/sdk/io/FileBasedSinkTest.java | 27 ++++++++++- .../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +- .../beam/sdk/util/IOChannelUtilsTest.java | 30 ------------ 6 files changed, 75 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 6ae950e..b8ad0a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -32,12 +32,16 @@ import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; +import java.text.DecimalFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -104,6 +108,46 @@ import org.slf4j.LoggerFactory; */ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); + // Pattern that matches shard placeholders within a shard template. + private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)"); + + /** + * Constructs a fully qualified name from components. + * + * <p>The name is built from a prefix, shard template (with shard numbers + * applied), and a suffix. All components are required, but may be empty + * strings. + * + * <p>Within a shard template, repeating sequences of the letters "S" or "N" + * are replaced with the shard number, or number of shards respectively. The + * numbers are formatted with leading zeros to match the length of the + * repeated sequence of letters. + * + * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and + * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is + * produced: "output-001-of-100.txt". + */ + public static String constructName(String prefix, + String shardTemplate, String suffix, int shardNum, int numShards) { + // Matcher API works with StringBuffer, rather than StringBuilder. + StringBuffer sb = new StringBuffer(); + sb.append(prefix); + + Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate); + while (m.find()) { + boolean isShardNum = (m.group(1).charAt(0) == 'S'); + + char[] zeros = new char[m.end() - m.start()]; + Arrays.fill(zeros, '0'); + DecimalFormat df = new DecimalFormat(String.valueOf(zeros)); + String formatted = df.format(isShardNum ? shardNum : numShards); + m.appendReplacement(sb, formatted); + } + m.appendTail(sb); + + sb.append(suffix); + return sb.toString(); + } /** * Directly supported file output compression types. @@ -301,7 +345,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } String suffix = getFileExtension(extension); - String filename = IOChannelUtils.constructName( + String filename = constructName( baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(), context.getNumShards()); return filename; http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index 9d3dd23..0d91bbc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -30,8 +30,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.text.DecimalFormat; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -43,6 +41,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -54,9 +53,6 @@ public class IOChannelUtils { private static final Map<String, IOChannelFactory> FACTORY_MAP = Collections.synchronizedMap(new HashMap<String, IOChannelFactory>()); - // Pattern that matches shard placeholders within a shard template. - private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)"); - private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader(); /** @@ -201,7 +197,7 @@ public class IOChannelUtils { public static WritableByteChannel create(String prefix, String shardTemplate, String suffix, int numShards, String mimeType) throws IOException { if (numShards == 1) { - return create(constructName(prefix, shardTemplate, suffix, 0, 1), + return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1), mimeType); } @@ -213,7 +209,7 @@ public class IOChannelUtils { Set<String> outputNames = new HashSet<>(); for (int i = 0; i < numShards; i++) { String outputName = - constructName(prefix, shardTemplate, suffix, i, numShards); + FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards); if (!outputNames.add(outputName)) { throw new IllegalArgumentException( "Shard name collision detected for: " + outputName); @@ -236,46 +232,6 @@ public class IOChannelUtils { return getFactory(spec).getSizeBytes(spec); } - /** - * Constructs a fully qualified name from components. - * - * <p>The name is built from a prefix, shard template (with shard numbers - * applied), and a suffix. All components are required, but may be empty - * strings. - * - * <p>Within a shard template, repeating sequences of the letters "S" or "N" - * are replaced with the shard number, or number of shards respectively. The - * numbers are formatted with leading zeros to match the length of the - * repeated sequence of letters. - * - * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and - * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is - * produced: "output-001-of-100.txt". - */ - public static String constructName(String prefix, - String shardTemplate, String suffix, int shardNum, int numShards) { - // Matcher API works with StringBuffer, rather than StringBuilder. - StringBuffer sb = new StringBuffer(); - sb.append(prefix); - - Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate); - while (m.find()) { - boolean isShardNum = (m.group(1).charAt(0) == 'S'); - - char[] zeros = new char[m.end() - m.start()]; - Arrays.fill(zeros, '0'); - DecimalFormat df = new DecimalFormat(String.valueOf(zeros)); - String formatted = df.format(isShardNum - ? shardNum - : numShards); - m.appendReplacement(sb, formatted); - } - m.appendTail(sb); - - sb.append(suffix); - return sb.toString(); - } - private static final Pattern URI_SCHEME_PATTERN = Pattern.compile( "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*"); http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 3e1c4b8..ece7997 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -497,7 +497,7 @@ public class AvroIOTest { for (int i = 0; i < numShards; i++) { expectedFiles.add( new File( - IOChannelUtils.constructName( + FileBasedSink.constructName( outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards))); } http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 65fb8ba..fe65a83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,13 +17,13 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.FileBasedSink.constructName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; - import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -45,7 +45,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.zip.GZIPInputStream; - import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; @@ -87,6 +86,30 @@ public class FileBasedSinkTest { return appendToTempFolder(tempDirectory); } + @Test + public void testConstructName() { + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123)); + + assertEquals("out.txt/part-00042", + constructName("out.txt", "/part-SSSSS", "", 42, 100)); + + assertEquals("out.txt", + constructName("ou", "t.t", "xt", 1, 1)); + + assertEquals("out0102shard.txt", + constructName("out", "SSNNshard", ".txt", 1, 2)); + + assertEquals("out-2/1.part-1-of-2.txt", + constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2)); + } + + @Test + public void testConstructNameWithLargeShardCount() { + assertEquals("out-100-of-5000.txt", + constructName("out", "-SS-of-NN", ".txt", 100, 5000)); + } + /** * FileBasedWriter opens the correct file, writes the header, footer, and elements in the * correct order, and returns the correct filename. http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 47b4963..b59938e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -339,7 +339,7 @@ public class TextIOTest { expectedFiles.add( new File( rootLocation.toString(), - IOChannelUtils.constructName(outputName, shardNameTemplate, "", i, numShards))); + FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index 6dfa4c7..ea4ae87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -28,7 +28,6 @@ import com.google.common.io.Files; import java.io.File; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -47,28 +46,6 @@ public class IOChannelUtilsTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void testShardFormatExpansion() { - assertEquals("output-001-of-123.txt", - IOChannelUtils.constructName("output", "-SSS-of-NNN", - ".txt", - 1, 123)); - - assertEquals("out.txt/part-00042", - IOChannelUtils.constructName("out.txt", "/part-SSSSS", "", - 42, 100)); - - assertEquals("out.txt", - IOChannelUtils.constructName("ou", "t.t", "xt", 1, 1)); - - assertEquals("out0102shard.txt", - IOChannelUtils.constructName("out", "SSNNshard", ".txt", 1, 2)); - - assertEquals("out-2/1.part-1-of-2.txt", - IOChannelUtils.constructName("out", "-N/S.part-S-of-N", - ".txt", 1, 2)); - } - @Test(expected = IllegalArgumentException.class) public void testShardNameCollision() throws Exception { File outFolder = tmpFolder.newFolder(); @@ -80,13 +57,6 @@ public class IOChannelUtilsTest { } @Test - public void testLargeShardCount() { - Assert.assertEquals("out-100-of-5000.txt", - IOChannelUtils.constructName("out", "-SS-of-NN", ".txt", - 100, 5000)); - } - - @Test public void testHandlerNoScheme() throws Exception { String pathToTempFolder = tmpFolder.getRoot().getAbsolutePath(); assertThat(IOChannelUtils.getFactory(pathToTempFolder), instanceOf(FileIOChannelFactory.class));
