Reverted header and footer to be of type String.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/092a1870 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/092a1870 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/092a1870 Branch: refs/heads/gearpump-runner Commit: 092a1870fc84067ae0e19a736a37160a9a55c653 Parents: 1b420db Author: Stas Levin <stasle...@gmail.com> Authored: Wed Sep 7 09:57:17 2016 +0300 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:14 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 68 ++++++++------------ .../java/org/apache/beam/sdk/io/TextIOTest.java | 68 +++++++++++--------- 2 files changed, 63 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/092a1870/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index eefa867..0895123 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; - import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -36,14 +35,12 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -477,10 +474,10 @@ public class TextIO { private final String filenameSuffix; /** An optional header to add to each file. */ - private final T header; + private final String header; /** An optional footer to add to each file. */ - private final T footer; + private final String footer; /** The Coder to use to decode each line. */ private final Coder<T> coder; @@ -498,8 +495,8 @@ public class TextIO { this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true); } - private Bound(String name, String filenamePrefix, String filenameSuffix, T header, - T footer, Coder<T> coder, int numShards, String shardTemplate, + private Bound(String name, String filenamePrefix, String filenameSuffix, String header, + String footer, Coder<T> coder, int numShards, String shardTemplate, boolean validate) { super(name); this.header = header; @@ -512,14 +509,6 @@ public class TextIO { this.validate = validate; } - private String asString(T obj, Coder<T> coder) { - try { - return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj)); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - /** * Returns a transform for writing to text files that's like this one but * that writes to the file(s) with the given filename prefix. @@ -605,8 +594,9 @@ public class TextIO { * the elements of the input {@link PCollection PCollection<X>} into an * output text line. Does not modify this object. * + * @param <X> the type of the elements of the input {@link PCollection} */ - public Bound<?> withCoder(Coder<? super T> coder) { + public <X> Bound<X> withCoder(Coder<X> coder) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate); } @@ -626,12 +616,12 @@ public class TextIO { shardTemplate, false); } - public Bound<T> withHeader(T header) { + public Bound<T> withHeader(String header) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, false); } - public Bound<T> withFooter(T footer) { + public Bound<T> withFooter(String footer) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, false); } @@ -669,9 +659,9 @@ public class TextIO { .withLabel("Validation Enabled"), true) .addIfNotDefault(DisplayData.item("numShards", numShards) .withLabel("Maximum Output Shards"), 0) - .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder)) + .addIfNotNull(DisplayData.item("fileHeader", header) .withLabel("File Header")) - .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder)) + .addIfNotNull(DisplayData.item("fileFooter", footer) .withLabel("File Footer")); } @@ -707,11 +697,11 @@ public class TextIO { return coder; } - public T getHeader() { + public String getHeader() { return header; } - public T getFooter() { + public String getFooter() { return footer; } @@ -997,21 +987,17 @@ public class TextIO { @VisibleForTesting static class TextSink<T> extends FileBasedSink<T> { private final Coder<T> coder; - private final byte[] header; - private final byte[] footer; + private final String header; + private final String footer; @VisibleForTesting TextSink( - String baseOutputFilename, String extension, T header, T footer, + String baseOutputFilename, String extension, String header, String footer, String fileNameTemplate, Coder<T> coder) { super(baseOutputFilename, extension, fileNameTemplate); this.coder = coder; - try { - this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header); - this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer); - } catch (CoderException e) { - throw new RuntimeException(e); - } + this.header = header; + this.footer = footer; } @Override @@ -1025,10 +1011,10 @@ public class TextIO { */ private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { private final Coder<T> coder; - private final byte[] header; - private final byte[] footer; + private final String header; + private final String footer; - private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) { + private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) { super(sink); this.coder = coder; this.header = header; @@ -1048,20 +1034,20 @@ public class TextIO { private static class TextWriter<T> extends FileBasedWriter<T> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); private final Coder<T> coder; - private final byte[] header; - private final byte[] footer; + private final String header; + private final String footer; private OutputStream out; public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { this(writeOperation, coder, null, null); } - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) { + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) { this(writeOperation, coder, header, null); } - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header, - byte[] footer) { + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header, + String footer) { super(writeOperation); this.header = header; this.footer = footer; @@ -1069,9 +1055,9 @@ public class TextIO { this.coder = coder; } - private void writeLine(byte[] line) throws IOException { + private void writeLine(String line) throws IOException { if (line != null) { - out.write(line); + out.write(line.getBytes(StandardCharsets.UTF_8)); out.write(NEWLINE); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/092a1870/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 7028761..c60b735 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -233,11 +233,11 @@ public class TextIOTest { runTestWrite(elems, null, null, coder, numShards); } - <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception { + <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception { runTestWrite(elems, header, footer, coder, 1); } - <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards) + <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards) throws Exception { String outputName = "file.txt"; String baseFilename = tmpFolder.newFile(outputName).getPath(); @@ -252,7 +252,7 @@ public class TextIOTest { // T==String write = (TextIO.Write.Bound<T>) writeStrings; } else { - write = TextIO.Write.withCoder(coder).to(baseFilename); + write = TextIO.Write.to(baseFilename).withCoder(coder); } write = write.withHeader(header).withFooter(footer); @@ -271,9 +271,9 @@ public class TextIOTest { public static <T> void assertOutputFiles( T[] elems, - final T header, - final T footer, - final Coder<T> coder, + final String header, + final String footer, + Coder<T> coder, int numShards, TemporaryFolder rootLocation, String outputName, @@ -320,44 +320,48 @@ public class TextIOTest { expectedElements.add(line); } - final String headerString = - header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header)); - - final String footerString = - footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer)); - ArrayList<String> actualElements = Lists.newArrayList( Iterables.concat( FluentIterable .from(actual) - .transform(new Function<List<String>, List<String>>() { - @Nullable - @Override - public List<String> apply(List<String> lines) { - ArrayList<String> newLines = Lists.newArrayList(lines); - if (headerString != null) { - newLines.remove(0); - } - if (footerString != null) { - int last = newLines.size() - 1; - newLines.remove(last); - } - return newLines; - } - }) + .transform(removeHeaderAndFooter(header, footer)) .toList())); assertThat(actualElements, containsInAnyOrder(expectedElements.toArray())); - assertTrue(Iterables.all(actual, new Predicate<List<String>>() { + assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer))); + } + + private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header, + final String footer) { + return new Function<List<String>, List<String>>() { + @Nullable + @Override + public List<String> apply(List<String> lines) { + ArrayList<String> newLines = Lists.newArrayList(lines); + if (header != null) { + newLines.remove(0); + } + if (footer != null) { + int last = newLines.size() - 1; + newLines.remove(last); + } + return newLines; + } + }; + } + + private static Predicate<List<String>> haveProperHeaderAndFooter(final String header, + final String footer) { + return new Predicate<List<String>>() { @Override - public boolean apply(@Nullable List<String> fileLines) { + public boolean apply(List<String> fileLines) { int last = fileLines.size() - 1; - return (headerString == null || fileLines.get(0).equals(headerString)) - && (footerString == null || fileLines.get(last).equals(footerString)); + return (header == null || fileLines.get(0).equals(header)) + && (footer == null || fileLines.get(last).equals(footer)); } - })); + }; } @Test