Repository: beam Updated Branches: refs/heads/master 87499f749 -> cd813fba0
[BEAM-59] Switch mimeType from mutable protected field to constructor Protected mutable fields are a terrible design pattern Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fbb6b642 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fbb6b642 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fbb6b642 Branch: refs/heads/master Commit: fbb6b642e04101a80f6b1f1c1b9b791a736d59f0 Parents: 87499f7 Author: Dan Halperin <[email protected]> Authored: Sun Apr 30 11:48:12 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 1 16:34:00 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 3 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 33 ++++++++++++-------- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 3 +- .../java/org/apache/beam/sdk/io/TextIO.java | 3 +- .../java/org/apache/beam/sdk/io/SimpleSink.java | 3 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 3 +- 6 files changed, 27 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index a48976f..2031569 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -1032,8 +1032,7 @@ public class AvroIO { AvroCoder<T> coder, SerializableAvroCodecFactory codec, ImmutableMap<String, Object> metadata) { - super(writeOperation); - this.mimeType = MimeTypes.BINARY; + super(writeOperation, MimeTypes.BINARY); this.coder = coder; this.codec = codec; this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 253a08b..7ba608c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -156,7 +157,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { /** * No compression, or any other transformation, will be used. */ - UNCOMPRESSED("", MimeTypes.TEXT) { + UNCOMPRESSED("", null) { @Override public WritableByteChannel create(WritableByteChannel channel) throws IOException { return channel; @@ -193,9 +194,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { }; private String filenameSuffix; - private String mimeType; + @Nullable private String mimeType; - CompressionType(String suffix, String mimeType) { + CompressionType(String suffix, @Nullable String mimeType) { this.filenameSuffix = suffix; this.mimeType = mimeType; } @@ -206,7 +207,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override - public String getMimeType() { + @Nullable public String getMimeType() { return mimeType; } } @@ -792,19 +793,20 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { /** * The MIME type used in the creation of the output channel (if the file system supports it). * - * <p>GCS, for example, supports writing files with Content-Type metadata. - * - * <p>May be overridden. Default is {@link MimeTypes#TEXT}. See {@link MimeTypes} for other - * options. + * <p>This is the default for the sink, but it may be overridden by a supplied + * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses + * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then + * the MIME type will be overridden to {@link MimeTypes#BINARY}. */ - protected String mimeType = MimeTypes.TEXT; + private final String mimeType; /** * Construct a new FileBasedWriter with a base filename. */ - public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) { + public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType) { checkNotNull(writeOperation); this.writeOperation = writeOperation; + this.mimeType = mimeType; } /** @@ -888,8 +890,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { LOG.debug("Opening {}.", filename); final WritableByteChannelFactory factory = getWriteOperation().getSink().writableByteChannelFactory; - mimeType = factory.getMimeType(); - channel = factory.create(IOChannelUtils.create(filename, mimeType)); + // The factory may force a MIME type or it may return null, indicating to use the sink's MIME. + String channelMimeType = firstNonNull(factory.getMimeType(), mimeType); + channel = factory.create(IOChannelUtils.create(filename, channelMimeType)); try { prepareWrite(channel); LOG.debug("Writing header to {}.", filename); @@ -1026,11 +1029,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { WritableByteChannel create(WritableByteChannel channel) throws IOException; /** - * @return the MIME type that should be used for the files that will hold the output data + * Returns the MIME type that should be used for the files that will hold the output data. May + * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change + * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}). + * * @see MimeTypes * @see <a href= * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a> */ + @Nullable String getMimeType(); /** http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 1d7477b..8a1870e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -603,8 +603,7 @@ public class TFRecordIO { private TFRecordCodec codec; private TFRecordWriter(FileBasedWriteOperation<byte[]> writeOperation) { - super(writeOperation); - this.mimeType = MimeTypes.BINARY; + super(writeOperation, MimeTypes.BINARY); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index d161d23..6b58391 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -1079,10 +1079,9 @@ public class TextIO { FileBasedWriteOperation<String> writeOperation, @Nullable String header, @Nullable String footer) { - super(writeOperation); + super(writeOperation, MimeTypes.TEXT); this.header = header; this.footer = footer; - this.mimeType = MimeTypes.TEXT; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index 8caf004..f83642a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.MimeTypes; /** * A simple FileBasedSink that writes String values as lines with header and footer lines. @@ -65,7 +66,7 @@ class SimpleSink extends FileBasedSink<String> { private WritableByteChannel channel; public SimpleWriter(SimpleWriteOperation writeOperation) { - super(writeOperation); + super(writeOperation, MimeTypes.TEXT); } private static ByteBuffer wrap(String value) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 8a1621e..6f87d75 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.MimeTypes; /** Implementation of {@link XmlIO#write}. */ class XmlSink<T> extends FileBasedSink<T> { @@ -111,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T> { private OutputStream os = null; public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) { - super(writeOperation); + super(writeOperation, MimeTypes.TEXT); this.marshaller = marshaller; }
