Revised according to comments following a code review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b420dbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b420dbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b420dbd Branch: refs/heads/gearpump-runner Commit: 1b420dbdc08c3fde53dd5ce0a56260576cee3076 Parents: b235595 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Sep 6 23:22:11 2016 +0300 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:13 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 72 +++++++++++-------- .../java/org/apache/beam/sdk/io/TextIOTest.java | 73 +++++++++++++++----- 2 files changed, 98 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b420dbd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index c754a0b..eefa867 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -35,12 +36,14 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -474,10 +477,10 @@ public class TextIO { private final String filenameSuffix; /** An optional header to add to each file. */ - private final String header; + private final T header; /** An optional footer to add to each file. */ - private final String footer; + private final T footer; /** The Coder to use to decode each line. */ private final Coder<T> coder; @@ -495,8 +498,8 @@ public class TextIO { this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true); } - private Bound(String name, String filenamePrefix, String filenameSuffix, String header, - String footer, Coder<T> coder, int numShards, String shardTemplate, + private Bound(String name, String filenamePrefix, String filenameSuffix, T header, + T footer, Coder<T> coder, int numShards, String shardTemplate, boolean validate) { super(name); this.header = header; @@ -509,6 +512,14 @@ public class TextIO { this.validate = validate; } + private String asString(T obj, Coder<T> coder) { + try { + return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj)); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + /** * Returns a transform for writing to text files that's like this one but * that writes to the file(s) with the given filename prefix. @@ -594,9 +605,8 @@ public class TextIO { * the elements of the input {@link PCollection PCollection<X>} into an * output text line. Does not modify this object. * - * @param <X> the type of the elements of the input {@link PCollection} */ - public <X> Bound<X> withCoder(Coder<X> coder) { + public Bound<?> withCoder(Coder<? super T> coder) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -616,12 +626,12 @@ public class TextIO { shardTemplate, false); } - public Bound<T> withHeader(String header) { + public Bound<T> withHeader(T header) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, false); } - public Bound<T> withFooter(String footer) { + public Bound<T> withFooter(T footer) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, false); } @@ -659,10 +669,10 @@ public class TextIO { .withLabel("Validation Enabled"), true) .addIfNotDefault(DisplayData.item("numShards", numShards) .withLabel("Maximum Output Shards"), 0) - .addIfNotNull(DisplayData.item("fileHeader", header) - .withLabel("Output file header")) - .addIfNotNull(DisplayData.item("fileFooter", footer) - .withLabel("Output file footer")); + .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder)) + .withLabel("File Header")) + .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder)) + .withLabel("File Footer")); } /** @@ -697,11 +707,11 @@ public class TextIO { return coder; } - public String getHeader() { + public T getHeader() { return header; } - public String getFooter() { + public T getFooter() { return footer; } @@ -987,17 +997,21 @@ public class TextIO { @VisibleForTesting static class TextSink<T> extends FileBasedSink<T> { private final Coder<T> coder; - private final String header; - private final String footer; + private final byte[] header; + private final byte[] footer; @VisibleForTesting TextSink( - String baseOutputFilename, String extension, String header, String footer, + String baseOutputFilename, String extension, T header, T footer, String fileNameTemplate, Coder<T> coder) { super(baseOutputFilename, extension, fileNameTemplate); this.coder = coder; - this.header = header; - this.footer = footer; + try { + this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header); + this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer); + } catch (CoderException e) { + throw new RuntimeException(e); + } } @Override @@ -1011,10 +1025,10 @@ public class TextIO { */ private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { private final Coder<T> coder; - private final String header; - private final String footer; + private final byte[] header; + private final byte[] footer; - private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) { + private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) { super(sink); this.coder = coder; this.header = header; @@ -1034,20 +1048,20 @@ public class TextIO { private static class TextWriter<T> extends FileBasedWriter<T> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); private final Coder<T> coder; - private final String header; - private final String footer; + private final byte[] header; + private final byte[] footer; private OutputStream out; public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { this(writeOperation, coder, null, null); } - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) { + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) { this(writeOperation, coder, header, null); } - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header, - String footer) { + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header, + byte[] footer) { super(writeOperation); this.header = header; this.footer = footer; @@ -1055,9 +1069,9 @@ public class TextIO { this.coder = coder; } - private void writeLine(String line) throws IOException { + private void writeLine(byte[] line) throws IOException { if (line != null) { - out.write(line.getBytes(StandardCharsets.UTF_8)); + out.write(line); out.write(NEWLINE); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b420dbd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 2ab2683..7028761 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -33,7 +33,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.BufferedReader; @@ -229,11 +233,11 @@ public class TextIOTest { runTestWrite(elems, null, null, coder, numShards); } - <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception { + <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception { runTestWrite(elems, header, footer, coder, 1); } - <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards) + <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards) throws Exception { String outputName = "file.txt"; String baseFilename = tmpFolder.newFile(outputName).getPath(); @@ -248,7 +252,7 @@ public class TextIOTest { // T==String write = (TextIO.Write.Bound<T>) writeStrings; } else { - write = TextIO.Write.to(baseFilename).withCoder(coder); + write = TextIO.Write.withCoder(coder).to(baseFilename); } write = write.withHeader(header).withFooter(footer); @@ -267,9 +271,9 @@ public class TextIOTest { public static <T> void assertOutputFiles( T[] elems, - String header, - String footer, - Coder<T> coder, + final T header, + final T footer, + final Coder<T> coder, int numShards, TemporaryFolder rootLocation, String outputName, @@ -291,36 +295,69 @@ public class TextIOTest { } } - List<String> actual = new ArrayList<>(); + List<List<String>> actual = new ArrayList<>(); + for (File tmpFile : expectedFiles) { try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { + List<String> currentFile = Lists.newArrayList(); for (;;) { String line = reader.readLine(); if (line == null) { break; } - actual.add(line); + currentFile.add(line); } + actual.add(currentFile); } } - LinkedList<String> expected = Lists.newLinkedList(); + LinkedList<String> expectedElements = Lists.newLinkedList(); for (int i = 0; i < elems.length; i++) { T elem = elems[i]; byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); String line = new String(encodedElem); - expected.add(line); + expectedElements.add(line); } - if (header != null) { - expected.addFirst(header); - } - if (footer != null) { - expected.addLast(footer); - } - - assertThat(actual, containsInAnyOrder(expected.toArray())); + final String headerString = + header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header)); + + final String footerString = + footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer)); + + ArrayList<String> actualElements = + Lists.newArrayList( + Iterables.concat( + FluentIterable + .from(actual) + .transform(new Function<List<String>, List<String>>() { + @Nullable + @Override + public List<String> apply(List<String> lines) { + ArrayList<String> newLines = Lists.newArrayList(lines); + if (headerString != null) { + newLines.remove(0); + } + if (footerString != null) { + int last = newLines.size() - 1; + newLines.remove(last); + } + return newLines; + } + }) + .toList())); + + assertThat(actualElements, containsInAnyOrder(expectedElements.toArray())); + + assertTrue(Iterables.all(actual, new Predicate<List<String>>() { + @Override + public boolean apply(@Nullable List<String> fileLines) { + int last = fileLines.size() - 1; + return (headerString == null || fileLines.get(0).equals(headerString)) + && (footerString == null || fileLines.get(last).equals(footerString)); + } + })); } @Test