Add header/footer support to TextIO.Write
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2355957 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2355957 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2355957 Branch: refs/heads/gearpump-runner Commit: b23559578f2a7acda477b4ebccccb6a6f7c9b03e Parents: 50c1c88 Author: Stas Levin <stasle...@gmail.com> Authored: Mon Sep 5 20:26:12 2016 +0300 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:13 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 114 ++++++++++++++++--- .../java/org/apache/beam/sdk/io/TextIOTest.java | 60 +++++++++- 2 files changed, 152 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2355957/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 242470b..c754a0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -448,7 +448,15 @@ public class TextIO { return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation(); } - // TODO: appendingNewlines, header, footer, etc. + public static Bound<String> withHeader(String header) { + return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header); + } + + public static Bound<String> withFooter(String footer) { + return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer); + } + + // TODO: appendingNewlines, etc. /** * A PTransform that writes a bounded PCollection to a text file (or @@ -465,6 +473,12 @@ public class TextIO { /** The suffix of each file written, combined with prefix and shardTemplate. */ private final String filenameSuffix; + /** An optional header to add to each file. */ + private final String header; + + /** An optional footer to add to each file. */ + private final String footer; + /** The Coder to use to decode each line. */ private final Coder<T> coder; @@ -478,12 +492,15 @@ public class TextIO { private final boolean validate; Bound(Coder<T> coder) { - this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true); + this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true); } - private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder, - int numShards, String shardTemplate, boolean validate) { + private Bound(String name, String filenamePrefix, String filenameSuffix, String header, + String footer, Coder<T> coder, int numShards, String shardTemplate, + boolean validate) { super(name); + this.header = header; + this.footer = footer; this.coder = coder; this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -502,7 +519,7 @@ public class TextIO { */ public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -516,7 +533,7 @@ public class TextIO { */ public Bound<T> withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards, + return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards, shardTemplate, validate); } @@ -536,7 +553,7 @@ public class TextIO { */ public Bound<T> withNumShards(int numShards) { checkArgument(numShards >= 0); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -549,7 +566,7 @@ public class TextIO { * @see ShardNameTemplate */ public Bound<T> withShardNameTemplate(String shardTemplate) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -567,7 +584,8 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate); + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "", + validate); } /** @@ -579,7 +597,7 @@ public class TextIO { * @param <X> the type of the elements of the input {@link PCollection} */ public <X> Bound<X> withCoder(Coder<X> coder) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -594,7 +612,17 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutValidation() { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + shardTemplate, false); + } + + public Bound<T> withHeader(String header) { + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + shardTemplate, false); + } + + public Bound<T> withFooter(String footer) { + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, false); } @@ -607,7 +635,8 @@ public class TextIO { org.apache.beam.sdk.io.Write.Bound<T> write = org.apache.beam.sdk.io.Write.to( - new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder)); + new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate, + coder)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -629,7 +658,11 @@ public class TextIO { .addIfNotDefault(DisplayData.item("validation", validate) .withLabel("Validation Enabled"), true) .addIfNotDefault(DisplayData.item("numShards", numShards) - .withLabel("Maximum Output Shards"), 0); + .withLabel("Maximum Output Shards"), 0) + .addIfNotNull(DisplayData.item("fileHeader", header) + .withLabel("Output file header")) + .addIfNotNull(DisplayData.item("fileFooter", footer) + .withLabel("Output file footer")); } /** @@ -664,6 +697,14 @@ public class TextIO { return coder; } + public String getHeader() { + return header; + } + + public String getFooter() { + return footer; + } + public boolean needsValidation() { return validate; } @@ -946,17 +987,22 @@ public class TextIO { @VisibleForTesting static class TextSink<T> extends FileBasedSink<T> { private final Coder<T> coder; + private final String header; + private final String footer; @VisibleForTesting TextSink( - String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) { + String baseOutputFilename, String extension, String header, String footer, + String fileNameTemplate, Coder<T> coder) { super(baseOutputFilename, extension, fileNameTemplate); this.coder = coder; + this.header = header; + this.footer = footer; } @Override public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new TextWriteOperation<>(this, coder); + return new TextWriteOperation<>(this, coder, header, footer); } /** @@ -965,15 +1011,19 @@ public class TextIO { */ private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { private final Coder<T> coder; + private final String header; + private final String footer; - private TextWriteOperation(TextSink<T> sink, Coder<T> coder) { + private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) { super(sink); this.coder = coder; + this.header = header; + this.footer = footer; } @Override public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new TextWriter<>(this, coder); + return new TextWriter<>(this, coder, header, footer); } } @@ -984,20 +1034,50 @@ public class TextIO { private static class TextWriter<T> extends FileBasedWriter<T> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); private final Coder<T> coder; + private final String header; + private final String footer; private OutputStream out; public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { + this(writeOperation, coder, null, null); + } + + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) { + this(writeOperation, coder, header, null); + } + + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header, + String footer) { super(writeOperation); + this.header = header; + this.footer = footer; this.mimeType = MimeTypes.TEXT; this.coder = coder; } + private void writeLine(String line) throws IOException { + if (line != null) { + out.write(line.getBytes(StandardCharsets.UTF_8)); + out.write(NEWLINE); + } + } + @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { out = Channels.newOutputStream(channel); } @Override + protected void writeHeader() throws Exception { + writeLine(header); + } + + @Override + protected void writeFooter() throws Exception { + writeLine(footer); + } + + @Override public void write(T value) throws Exception { coder.encode(value, out, Context.OUTER); out.write(NEWLINE); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2355957/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 8f94766..2ab2683 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -34,6 +34,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; @@ -48,6 +50,7 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.zip.GZIPOutputStream; @@ -101,6 +104,9 @@ import org.mockito.stubbing.Answer; @SuppressWarnings("unchecked") public class TextIOTest { + private static final String MY_HEADER = "myHeader"; + private static final String MY_FOOTER = "myFooter"; + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule @@ -216,10 +222,19 @@ public class TextIOTest { } <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception { - runTestWrite(elems, coder, 1); + runTestWrite(elems, null, null, coder, 1); } <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception { + runTestWrite(elems, null, null, coder, numShards); + } + + <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception { + runTestWrite(elems, header, footer, coder, 1); + } + + <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards) + throws Exception { String outputName = "file.txt"; String baseFilename = tmpFolder.newFile(outputName).getPath(); @@ -235,6 +250,8 @@ public class TextIOTest { } else { write = TextIO.Write.to(baseFilename).withCoder(coder); } + write = write.withHeader(header).withFooter(footer); + if (numShards == 1) { write = write.withoutSharding(); } else if (numShards > 0) { @@ -244,11 +261,14 @@ public class TextIOTest { p.run(); - assertOutputFiles(elems, coder, numShards, tmpFolder, outputName, write.getShardNameTemplate()); + assertOutputFiles(elems, header, footer, coder, numShards, tmpFolder, outputName, + write.getShardNameTemplate()); } public static <T> void assertOutputFiles( T[] elems, + String header, + String footer, Coder<T> coder, int numShards, TemporaryFolder rootLocation, @@ -284,15 +304,23 @@ public class TextIOTest { } } - String[] expected = new String[elems.length]; + LinkedList<String> expected = Lists.newLinkedList(); + for (int i = 0; i < elems.length; i++) { T elem = elems[i]; byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); String line = new String(encodedElem); - expected[i] = line; + expected.add(line); + } + + if (header != null) { + expected.addFirst(header); + } + if (footer != null) { + expected.addLast(footer); } - assertThat(actual, containsInAnyOrder(expected)); + assertThat(actual, containsInAnyOrder(expected.toArray())); } @Test @@ -332,18 +360,40 @@ public class TextIOTest { } @Test + @Category(NeedsRunner.class) + public void testWriteWithHeader() throws Exception { + runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteWithFooter() throws Exception { + runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteWithHeaderAndFooter() throws Exception { + runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER); + } + + @Test public void testWriteDisplayData() { TextIO.Write.Bound<?> write = TextIO.Write .to("foo") .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) + .withFooter("myFooter") + .withHeader("myHeader") .withoutValidation(); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("filePrefix", "foo")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); + assertThat(displayData, hasDisplayItem("fileHeader", "myHeader")); + assertThat(displayData, hasDisplayItem("fileFooter", "myFooter")); assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false));