Adds a canonical Compression enum for file-based IOs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54489f0d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54489f0d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54489f0d Branch: refs/heads/master Commit: 54489f0d52e354d8233bf297cce6ce451a05f6a5 Parents: afe8b0e Author: Eugene Kirpichov <[email protected]> Authored: Fri Aug 18 16:17:20 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Aug 30 17:40:52 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroSink.java | 2 +- .../apache/beam/sdk/io/CompressedSource.java | 292 ++++++------------- .../org/apache/beam/sdk/io/Compression.java | 228 +++++++++++++++ .../org/apache/beam/sdk/io/FileBasedSink.java | 113 +++---- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 153 ++++------ .../java/org/apache/beam/sdk/io/TextIO.java | 178 +++++------ .../beam/sdk/io/CompressedSourceTest.java | 17 +- .../apache/beam/sdk/io/FileBasedSinkTest.java | 41 ++- .../java/org/apache/beam/sdk/io/SimpleSink.java | 23 +- .../org/apache/beam/sdk/io/TFRecordIOTest.java | 35 ++- .../org/apache/beam/sdk/io/TextIOReadTest.java | 81 +++-- .../org/apache/beam/sdk/io/WriteFilesTest.java | 9 +- .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 96 +++--- 13 files changed, 672 insertions(+), 596 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index acd3ea6..888db85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -40,7 +40,7 @@ class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, Destin DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations, boolean genericRecords) { // Avro handle compression internally using the codec. - super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED); + super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED); this.dynamicDestinations = dynamicDestinations; this.genericRecords = genericRecords; } http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 6943a02..ae55d80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -20,28 +20,17 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.io.ByteStreams; -import com.google.common.primitives.Ints; import java.io.IOException; -import java.io.InputStream; -import java.io.PushbackInputStream; import java.io.Serializable; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.NoSuchElementException; -import java.util.zip.GZIPInputStream; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.joda.time.Instant; /** @@ -54,21 +43,20 @@ import org.joda.time.Instant; * FileBasedSource<T> mySource = ...; * PCollection<T> collection = p.apply(Read.from(CompressedSource * .from(mySource) - * .withDecompression(CompressedSource.CompressionMode.GZIP))); + * .withCompression(Compression.GZIP))); * } </pre> * - * <p>Supported compression algorithms are {@link CompressionMode#GZIP}, - * {@link CompressionMode#BZIP2}, {@link CompressionMode#ZIP} and {@link CompressionMode#DEFLATE}. - * User-defined compression types are supported by implementing + * <p>Supported compression algorithms are {@link Compression#GZIP}, + * {@link Compression#BZIP2}, {@link Compression#ZIP} and {@link Compression#DEFLATE}. + * User-defined compression types are supported by implementing a * {@link DecompressingChannelFactory}. * * <p>By default, the compression algorithm is selected from those supported in - * {@link CompressionMode} based on the file name provided to the source, namely - * {@code ".bz2"} indicates {@link CompressionMode#BZIP2}, {@code ".gz"} indicates - * {@link CompressionMode#GZIP}, {@code ".zip"} indicates {@link CompressionMode#ZIP} and - * {@code ".deflate"} indicates {@link CompressionMode#DEFLATE}. If the file name does not match - * any of the supported - * algorithms, it is assumed to be uncompressed data. + * {@link Compression} based on the file name provided to the source, namely + * {@code ".bz2"} indicates {@link Compression#BZIP2}, {@code ".gz"} indicates + * {@link Compression#GZIP}, {@code ".zip"} indicates {@link Compression#ZIP} and + * {@code ".deflate"} indicates {@link Compression#DEFLATE}. If the file name does not match + * any of the supported algorithms, it is assumed to be uncompressed data. * * @param <T> The type to read from the compressed file. */ @@ -85,197 +73,75 @@ public class CompressedSource<T> extends FileBasedSource<T> { throws IOException; } - /** - * Factory interface for creating channels that decompress the content of an underlying channel, - * based on both the channel and the file name. - */ - private interface FileNameBasedDecompressingChannelFactory - extends DecompressingChannelFactory { - /** - * Given a channel, create a channel that decompresses the content read from the channel. - */ - ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel) - throws IOException; - } - - /** - * Default compression types supported by the {@code CompressedSource}. - */ + /** @deprecated Use {@link Compression} instead */ + @Deprecated public enum CompressionMode implements DecompressingChannelFactory { - /** - * Reads a byte channel assuming it is compressed with gzip. - */ - GZIP { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".gz"); - } - - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - // Determine if the input stream is gzipped. The input stream returned from the - // GCS connector may already be decompressed; GCS does this based on the - // content-encoding property. - PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2); - byte[] headerBytes = new byte[2]; - int bytesRead = ByteStreams.read( - stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */); - stream.unread(headerBytes, 0, bytesRead); - if (bytesRead >= 2) { - byte zero = 0x00; - int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); - if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream, true)); - } - } - return Channels.newChannel(stream); - } - }, - - /** - * Reads a byte channel assuming it is compressed with bzip2. - */ - BZIP2 { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".bz2"); - } + /** @see Compression#UNCOMPRESSED */ + UNCOMPRESSED(Compression.UNCOMPRESSED), - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - return Channels.newChannel( - new BZip2CompressorInputStream(Channels.newInputStream(channel), true)); - } - }, + /** @see Compression#AUTO */ + AUTO(Compression.AUTO), - /** - * Reads a byte channel assuming it is compressed with zip. - * If the zip file contains multiple entries, files in the zip are concatenated all together. - */ - ZIP { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".zip"); - } + /** @see Compression#GZIP */ + GZIP(Compression.GZIP), - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel)); - return Channels.newChannel(zip); - } - }, + /** @see Compression#BZIP2 */ + BZIP2(Compression.BZIP2), - /** - * Reads a byte channel assuming it is compressed with deflate. - */ - DEFLATE { - @Override - public boolean matches(String fileName) { - return fileName.toLowerCase().endsWith(".deflate"); - } - - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException { - return Channels.newChannel( - new DeflateCompressorInputStream(Channels.newInputStream(channel))); - } - }; + /** @see Compression#ZIP */ + ZIP(Compression.ZIP), - /** - * Extend of {@link ZipInputStream} to automatically read all entries in the zip. - */ - private static class FullZipInputStream extends InputStream { + /** @see Compression#DEFLATE */ + DEFLATE(Compression.DEFLATE); - private ZipInputStream zipInputStream; - private ZipEntry currentEntry; - - public FullZipInputStream(InputStream is) throws IOException { - super(); - zipInputStream = new ZipInputStream(is); - currentEntry = zipInputStream.getNextEntry(); - } - - @Override - public int read() throws IOException { - int result = zipInputStream.read(); - while (result == -1) { - currentEntry = zipInputStream.getNextEntry(); - if (currentEntry == null) { - return -1; - } else { - result = zipInputStream.read(); - } - } - return result; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int result = zipInputStream.read(b, off, len); - while (result == -1) { - currentEntry = zipInputStream.getNextEntry(); - if (currentEntry == null) { - return -1; - } else { - result = zipInputStream.read(b, off, len); - } - } - return result; - } + private Compression canonical; + CompressionMode(Compression canonical) { + this.canonical = canonical; } /** * Returns {@code true} if the given file name implies that the contents are compressed * according to the compression embodied by this factory. */ - public abstract boolean matches(String fileName); + public boolean matches(String fileName) { + return canonical.matches(fileName); + } @Override - public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) - throws IOException; + public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) + throws IOException { + return canonical.readDecompressed(channel); + } /** Returns whether the file's extension matches of one of the known compression formats. */ public static boolean isCompressed(String filename) { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(filename)) { - return true; - } - } - return false; + return Compression.AUTO.isCompressed(filename); } - } - /** - * Reads a byte channel detecting compression according to the file name. If the filename - * is not any other known {@link CompressionMode}, it is presumed to be uncompressed. - */ - private static class DecompressAccordingToFilename - implements FileNameBasedDecompressingChannelFactory { + static DecompressingChannelFactory fromCanonical(Compression compression) { + switch (compression) { + case AUTO: + return AUTO; - @Override - public ReadableByteChannel createDecompressingChannel( - String fileName, ReadableByteChannel channel) throws IOException { - for (CompressionMode type : CompressionMode.values()) { - if (type.matches(fileName)) { - return type.createDecompressingChannel(channel); - } - } - // Uncompressed - return channel; - } + case UNCOMPRESSED: + return UNCOMPRESSED; - @Override - public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) { - throw new UnsupportedOperationException( - String.format("%s does not support createDecompressingChannel(%s) but only" - + " createDecompressingChannel(%s,%s)", - getClass().getSimpleName(), - String.class.getSimpleName(), - ReadableByteChannel.class.getSimpleName(), - ReadableByteChannel.class.getSimpleName())); + case GZIP: + return GZIP; + + case BZIP2: + return BZIP2; + + case ZIP: + return ZIP; + + case DEFLATE: + return DEFLATE; + + default: + throw new IllegalArgumentException("Unsupported compression type: " + compression); + } } } @@ -288,7 +154,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { * configured via {@link CompressedSource#withDecompression}. */ public static <T> CompressedSource<T> from(FileBasedSource<T> sourceDelegate) { - return new CompressedSource<>(sourceDelegate, new DecompressAccordingToFilename()); + return new CompressedSource<>(sourceDelegate, CompressionMode.AUTO); } /** @@ -299,6 +165,11 @@ public class CompressedSource<T> extends FileBasedSource<T> { return new CompressedSource<>(this.sourceDelegate, channelFactory); } + /** Like {@link #withDecompression} but takes a canonical {@link Compression}. */ + public CompressedSource<T> withCompression(Compression compression) { + return withDecompression(CompressionMode.fromCanonical(compression)); + } + /** * Creates a {@code CompressedSource} from a delegate file based source and a decompressing * channel factory. @@ -359,10 +230,21 @@ public class CompressedSource<T> extends FileBasedSource<T> { * from the requested file name that the file is not compressed. */ @Override - protected final boolean isSplittable() throws Exception { - return channelFactory instanceof FileNameBasedDecompressingChannelFactory - && !CompressionMode.isCompressed(getFileOrPatternSpec()) - && sourceDelegate.isSplittable(); + protected final boolean isSplittable() { + try { + if (!sourceDelegate.isSplittable()) { + return false; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + if (channelFactory == CompressionMode.UNCOMPRESSED) { + return true; + } + if (channelFactory == CompressionMode.AUTO) { + return !Compression.AUTO.isCompressed(getFileOrPatternSpec()); + } + return false; } /** @@ -375,10 +257,8 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) { - if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { - if (!CompressionMode.isCompressed(getFileOrPatternSpec())) { - return sourceDelegate.createSingleFileReader(options); - } + if (isSplittable()) { + return sourceDelegate.createSingleFileReader(options); } return new CompressedReader<T>( this, sourceDelegate.createSingleFileReader(options)); @@ -423,19 +303,19 @@ public class CompressedSource<T> extends FileBasedSource<T> { public static class CompressedReader<T> extends FileBasedReader<T> { private final FileBasedReader<T> readerDelegate; - private final CompressedSource<T> source; private final Object progressLock = new Object(); @GuardedBy("progressLock") private int numRecordsRead; @GuardedBy("progressLock") private CountingChannel channel; + private DecompressingChannelFactory channelFactory; /** * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader. */ public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) { super(source); - this.source = source; + this.channelFactory = source.getChannelFactory(); this.readerDelegate = readerDelegate; } @@ -525,14 +405,12 @@ public class CompressedSource<T> extends FileBasedSource<T> { channel = this.channel; } - if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) { - FileNameBasedDecompressingChannelFactory channelFactory = - (FileNameBasedDecompressingChannelFactory) source.getChannelFactory(); - readerDelegate.startReading(channelFactory.createDecompressingChannel( - getCurrentSource().getFileOrPatternSpec(), - channel)); + if (channelFactory == CompressionMode.AUTO) { + readerDelegate.startReading( + Compression.detect(getCurrentSource().getFileOrPatternSpec()) + .readDecompressed(channel)); } else { - readerDelegate.startReading(source.getChannelFactory().createDecompressingChannel( + readerDelegate.startReading(channelFactory.createDecompressingChannel( channel)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java new file mode 100644 index 0000000..bb40ed4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; +import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; + +/** Various compression types for reading/writing files. */ +public enum Compression { + /** + * When reading a file, automatically determine the compression type based on filename extension. + * Not applicable when writing files. + */ + AUTO("") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) { + throw new UnsupportedOperationException( + "Must resolve compression into a concrete value before calling readDecompressed()"); + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) { + throw new UnsupportedOperationException("AUTO is applicable only to reading files"); + } + }, + + /** No compression. */ + UNCOMPRESSED("") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) { + return channel; + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) { + return channel; + } + }, + + /** GZip compression. */ + GZIP(".gz", ".gz") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + // Determine if the input stream is gzipped. The input stream returned from the + // GCS connector may already be decompressed; GCS does this based on the + // content-encoding property. + PushbackInputStream stream = new PushbackInputStream(Channels.newInputStream(channel), 2); + byte[] headerBytes = new byte[2]; + int bytesRead = + ByteStreams.read( + stream /* source */, headerBytes /* dest */, 0 /* offset */, 2 /* len */); + stream.unread(headerBytes, 0, bytesRead); + if (bytesRead >= 2) { + byte zero = 0x00; + int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); + if (header == GZIPInputStream.GZIP_MAGIC) { + return Channels.newChannel(new GzipCompressorInputStream(stream, true)); + } + } + return Channels.newChannel(stream); + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true)); + } + }, + + /** BZip compression. */ + BZIP2(".bz2", ".bz2") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + return Channels.newChannel( + new BZip2CompressorInputStream(Channels.newInputStream(channel), true)); + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + return Channels.newChannel( + new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); + } + }, + + /** Zip compression. */ + ZIP(".zip", ".zip") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel)); + return Channels.newChannel(zip); + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + throw new UnsupportedOperationException("Writing ZIP files is currently unsupported"); + } + }, + + /** Deflate compression. */ + DEFLATE(".deflate", ".deflate", ".zlib") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + return Channels.newChannel( + new DeflateCompressorInputStream(Channels.newInputStream(channel))); + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + return Channels.newChannel( + new DeflateCompressorOutputStream(Channels.newOutputStream(channel))); + } + }; + + private final String suggestedSuffix; + private final List<String> detectedSuffixes; + + Compression(String suggestedSuffix, String... detectedSuffixes) { + this.suggestedSuffix = suggestedSuffix; + this.detectedSuffixes = Arrays.asList(detectedSuffixes); + } + + public String getSuggestedSuffix() { + return suggestedSuffix; + } + + public boolean matches(String filename) { + for (String suffix : detectedSuffixes) { + if (filename.toLowerCase().endsWith(suffix)) { + return true; + } + } + return false; + } + + public boolean isCompressed(String filename) { + Compression compression = this; + if (compression == AUTO) { + compression = detect(filename); + } + return compression != UNCOMPRESSED; + } + + public static Compression detect(String filename) { + for (Compression value : values()) { + if (value.matches(filename)) { + return value; + } + } + return UNCOMPRESSED; + } + + public abstract ReadableByteChannel readDecompressed(ReadableByteChannel channel) + throws IOException; + + public abstract WritableByteChannel writeCompressed(WritableByteChannel channel) + throws IOException; + + /** Concatenates all {@link ZipInputStream}s contained within the zip file. */ + private static class FullZipInputStream extends InputStream { + private ZipInputStream zipInputStream; + private ZipEntry currentEntry; + + public FullZipInputStream(InputStream is) throws IOException { + super(); + zipInputStream = new ZipInputStream(is); + currentEntry = zipInputStream.getNextEntry(); + } + + @Override + public int read() throws IOException { + int result = zipInputStream.read(); + while (result == -1) { + currentEntry = zipInputStream.getNextEntry(); + if (currentEntry == null) { + return -1; + } else { + result = zipInputStream.read(); + } + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = zipInputStream.read(b, off, len); + while (result == -1) { + currentEntry = zipInputStream.getNextEntry(); + if (currentEntry == null) { + return -1; + } else { + result = zipInputStream.read(b, off, len); + } + } + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index d618647..39f7868 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Arrays; @@ -47,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -79,8 +77,6 @@ import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; -import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Instant; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -128,56 +124,66 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); - /** Directly supported file output compression types. */ + /** @deprecated use {@link Compression}. */ + @Deprecated public enum CompressionType implements WritableByteChannelFactory { - /** No compression, or any other transformation, will be used. */ - UNCOMPRESSED("", null) { - @Override - public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return channel; - } - }, - /** Provides GZip output transformation. */ - GZIP(".gz", MimeTypes.BINARY) { - @Override - public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true)); - } - }, - /** Provides BZip2 output transformation. */ - BZIP2(".bz2", MimeTypes.BINARY) { - @Override - public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return Channels.newChannel( - new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); - } - }, - /** Provides deflate output transformation. */ - DEFLATE(".deflate", MimeTypes.BINARY) { - @Override - public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return Channels.newChannel( - new DeflateCompressorOutputStream(Channels.newOutputStream(channel))); - } - }; + /** @see Compression#UNCOMPRESSED */ + UNCOMPRESSED(Compression.UNCOMPRESSED), - private String filenameSuffix; - @Nullable private String mimeType; + /** @see Compression#GZIP */ + GZIP(Compression.GZIP), - CompressionType(String suffix, @Nullable String mimeType) { - this.filenameSuffix = suffix; - this.mimeType = mimeType; + /** @see Compression#BZIP2 */ + BZIP2(Compression.BZIP2), + + /** @see Compression#DEFLATE */ + DEFLATE(Compression.DEFLATE); + + private Compression canonical; + + CompressionType(Compression canonical) { + this.canonical = canonical; } @Override public String getSuggestedFilenameSuffix() { - return filenameSuffix; + return canonical.getSuggestedSuffix(); } @Override @Nullable public String getMimeType() { - return mimeType; + return (canonical == Compression.UNCOMPRESSED) ? null : MimeTypes.BINARY; + } + + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return canonical.writeCompressed(channel); + } + + public static CompressionType fromCanonical(Compression canonical) { + switch(canonical) { + case AUTO: + throw new IllegalArgumentException("AUTO is not supported for writing"); + + case UNCOMPRESSED: + return UNCOMPRESSED; + + case GZIP: + return GZIP; + + case BZIP2: + return BZIP2; + + case ZIP: + throw new IllegalArgumentException("ZIP is unsupported"); + + case DEFLATE: + return DEFLATE; + + default: + throw new UnsupportedOperationException("Unsupported compression type: " + canonical); + } } } @@ -208,7 +214,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> /** * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the * underlying channel. The default is to not compress the output using {@link - * CompressionType#UNCOMPRESSED}. + * Compression#UNCOMPRESSED}. */ private final WritableByteChannelFactory writableByteChannelFactory; @@ -328,7 +334,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * When a sink has requested windowed or triggered output, this method will be invoked to return * the file {@link ResourceId resource} to be created given the base output directory and a * {@link OutputFileHints} containing information about the file, including a suggested - * extension (e.g. coming from {@link CompressionType}). + * extension (e.g. coming from {@link Compression}). * * <p>The policy must return unique and consistent filenames for different windows and panes. */ @@ -344,7 +350,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * When a sink has not requested windowed or triggered output, this method will be invoked to * return the file {@link ResourceId resource} to be created given the base output directory and * a {@link OutputFileHints} containing information about the file, including a suggested (e.g. - * coming from {@link CompressionType}). + * coming from {@link Compression}). * * <p>The shardNumber and numShards parameters, should be used by the policy to generate unique * and consistent filenames. @@ -375,7 +381,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> public FileBasedSink( ValueProvider<ResourceId> tempDirectoryProvider, DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) { - this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED); + this(tempDirectoryProvider, dynamicDestinations, Compression.UNCOMPRESSED); } /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */ @@ -390,6 +396,15 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> this.writableByteChannelFactory = writableByteChannelFactory; } + /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */ + @Experimental(Kind.FILESYSTEM) + public FileBasedSink( + ValueProvider<ResourceId> tempDirectoryProvider, + DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations, + Compression compression) { + this(tempDirectoryProvider, dynamicDestinations, CompressionType.fromCanonical(compression)); + } + /** Return the {@link DynamicDestinations} used. */ @SuppressWarnings("unchecked") public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() { @@ -799,7 +814,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * * <p>This is the default for the sink, but it may be overridden by a supplied {@link * WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by - * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to + * default but if {@link Compression#BZIP2} is set then the MIME type will be overridden to * {@link MimeTypes#BINARY}. */ private final String mimeType; @@ -1134,7 +1149,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> /** * Returns the MIME type that should be used for the files that will hold the output data. May * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change - * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}). + * the MIME type (e.g., for {@link Compression#UNCOMPRESSED}). * * @see MimeTypes * @see <a href= @@ -1144,7 +1159,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> String getMimeType(); /** - * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP} + * @return an optional filename suffix, eg, ".gz" is returned for {@link Compression#GZIP} */ @Nullable String getSuggestedFilenameSuffix(); http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 526c50e..ddedd00 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -64,7 +63,7 @@ public class TFRecordIO { public static Read read() { return new AutoValue_TFRecordIO_Read.Builder() .setValidate(true) - .setCompressionType(CompressionType.AUTO) + .setCompression(Compression.AUTO) .build(); } @@ -78,7 +77,7 @@ public class TFRecordIO { .setShardTemplate(null) .setFilenameSuffix(null) .setNumShards(0) - .setCompressionType(CompressionType.NONE) + .setCompression(Compression.UNCOMPRESSED) .build(); } @@ -90,7 +89,7 @@ public class TFRecordIO { abstract boolean getValidate(); - abstract CompressionType getCompressionType(); + abstract Compression getCompression(); abstract Builder toBuilder(); @@ -98,7 +97,7 @@ public class TFRecordIO { abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); abstract Builder setValidate(boolean validate); - abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setCompression(Compression compression); abstract Read build(); } @@ -134,18 +133,22 @@ public class TFRecordIO { return toBuilder().setValidate(false).build(); } + /** @deprecated Use {@link #withCompression}. */ + @Deprecated + public Read withCompressionType(TFRecordIO.CompressionType compressionType) { + return withCompression(compressionType.canonical); + } + /** - * Returns a transform for reading TFRecord files that decompresses all input files - * using the specified compression type. + * Returns a transform for reading TFRecord files that decompresses all input files using the + * specified compression type. * - * <p>If no compression type is specified, the default is - * {@link TFRecordIO.CompressionType#AUTO}. - * In this mode, the compression type of the file is determined by its extension - * (e.g., {@code *.gz} is gzipped, {@code *.zlib} is zlib compressed, and all other - * extensions are uncompressed). + * <p>If no compression type is specified, the default is {@link Compression#AUTO}. In this + * mode, the compression type of the file is determined by its extension via {@link + * Compression#detect(String)}. */ - public Read withCompressionType(TFRecordIO.CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); + public Read withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); } @Override @@ -174,29 +177,15 @@ public class TFRecordIO { // Helper to create a source specific to the requested compression type. protected FileBasedSource<byte[]> getSource() { - switch (getCompressionType()) { - case NONE: - return new TFRecordSource(getFilepattern()); - case AUTO: - return CompressedSource.from(new TFRecordSource(getFilepattern())); - case GZIP: - return - CompressedSource.from(new TFRecordSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.GZIP); - case ZLIB: - return - CompressedSource.from(new TFRecordSource(getFilepattern())) - .withDecompression(CompressedSource.CompressionMode.DEFLATE); - default: - throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); - } + return CompressedSource.from(new TFRecordSource(getFilepattern())) + .withCompression(getCompression()); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .add(DisplayData.item("compressionType", getCompressionType().toString()) + .add(DisplayData.item("compressionType", getCompression().toString()) .withLabel("Compression Type")) .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) @@ -223,7 +212,7 @@ public class TFRecordIO { @Nullable abstract String getShardTemplate(); /** Option to indicate the output sink's compression type. Default is NONE. */ - abstract CompressionType getCompressionType(); + abstract Compression getCompression(); abstract Builder toBuilder(); @@ -237,7 +226,7 @@ public class TFRecordIO { abstract Builder setNumShards(int numShards); - abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setCompression(Compression compression); abstract Write build(); } @@ -326,15 +315,20 @@ public class TFRecordIO { return withNumShards(1).withShardNameTemplate(""); } + /** @deprecated use {@link #withCompression}. */ + @Deprecated + public Write withCompressionType(CompressionType compressionType) { + return withCompression(compressionType.canonical); + } + /** * Writes to output files using the specified compression type. * - * <p>If no compression type is specified, the default is - * {@link TFRecordIO.CompressionType#NONE}. - * See {@link TFRecordIO.Read#withCompressionType} for more details. + * <p>If no compression type is specified, the default is {@link Compression#UNCOMPRESSED}. See + * {@link TFRecordIO.Read#withCompression} for more details. */ - public Write withCompressionType(CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); + public Write withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); } @Override @@ -347,7 +341,7 @@ public class TFRecordIO { getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), - getCompressionType())); + getCompression())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -366,45 +360,35 @@ public class TFRecordIO { .withLabel("Output Shard Name Template")) .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) - .add(DisplayData.item("compressionType", getCompressionType().toString()) + .add(DisplayData.item("compressionType", getCompression().toString()) .withLabel("Compression Type")); } } - /** - * Possible TFRecord file compression types. - */ + /** @deprecated Use {@link Compression}. */ + @Deprecated public enum CompressionType { - /** - * Automatically determine the compression type based on filename extension. - */ - AUTO(""), - /** - * Uncompressed. - */ - NONE(""), - /** - * GZipped. - */ - GZIP(".gz"), - /** - * ZLIB compressed. - */ - ZLIB(".zlib"); + /** @see Compression#AUTO */ + AUTO(Compression.AUTO), + + /** @see Compression#UNCOMPRESSED */ + NONE(Compression.UNCOMPRESSED), + + /** @see Compression#GZIP */ + GZIP(Compression.GZIP), + + /** @see Compression#DEFLATE */ + ZLIB(Compression.DEFLATE); - private String filenameSuffix; + private Compression canonical; - CompressionType(String suffix) { - this.filenameSuffix = suffix; + CompressionType(Compression canonical) { + this.canonical = canonical; } - /** - * Determine if a given filename matches a compression type based on its extension. - * @param filename the filename to match - * @return true iff the filename ends with the compression type's known extension. - */ + /** @see Compression#matches */ public boolean matches(String filename) { - return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); + return canonical.matches(filename); } } @@ -419,11 +403,6 @@ public class TFRecordIO { @VisibleForTesting static class TFRecordSource extends FileBasedSource<byte[]> { @VisibleForTesting - TFRecordSource(String fileSpec) { - super(StaticValueProvider.of(fileSpec), 1L); - } - - @VisibleForTesting TFRecordSource(ValueProvider<String> fileSpec) { super(fileSpec, Long.MAX_VALUE); } @@ -452,7 +431,7 @@ public class TFRecordIO { } @Override - protected boolean isSplittable() throws Exception { + protected boolean isSplittable() { // TFRecord files are not splittable return false; } @@ -528,20 +507,13 @@ public class TFRecordIO { ValueProvider<ResourceId> outputPrefix, @Nullable String shardTemplate, @Nullable String suffix, - TFRecordIO.CompressionType compressionType) { + Compression compression) { super( outputPrefix, DynamicFileDestinations.<byte[]>constant( DefaultFilenamePolicy.fromStandardParameters( outputPrefix, shardTemplate, suffix, false)), - writableByteChannelFactory(compressionType)); - } - - private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> { - @Override - public ResourceId apply(ResourceId input) { - return input.getCurrentDirectory(); - } + compression); } @Override @@ -549,21 +521,6 @@ public class TFRecordIO { return new TFRecordWriteOperation(this); } - private static WritableByteChannelFactory writableByteChannelFactory( - TFRecordIO.CompressionType compressionType) { - switch (compressionType) { - case AUTO: - throw new IllegalArgumentException("Unsupported compression type AUTO"); - case NONE: - return CompressionType.UNCOMPRESSED; - case GZIP: - return CompressionType.GZIP; - case ZLIB: - return CompressionType.DEFLATE; - } - return CompressionType.UNCOMPRESSED; - } - /** A {@link WriteOperation WriteOperation} for TFRecord files. */ private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> { private TFRecordWriteOperation(TFRecordSink sink) { http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 442e4d9..76102cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -32,7 +32,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.CompressedSource.CompressionMode; import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; @@ -135,7 +134,7 @@ import org.joda.time.Duration; * PCollection<String> lines = ...; * lines.apply(TextIO.write().to("/path/to/file.txt")) * .withSuffix(".txt") - * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); + * .withCompression(Compression.GZIP)); * }</pre> * * <p>Any existing files with the same names as generated output files will be overwritten. @@ -188,7 +187,7 @@ public class TextIO { */ public static Read read() { return new AutoValue_TextIO_Read.Builder() - .setCompressionType(CompressionType.AUTO) + .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) .build(); @@ -206,7 +205,7 @@ public class TextIO { */ public static ReadAll readAll() { return new AutoValue_TextIO_ReadAll.Builder() - .setCompressionType(CompressionType.AUTO) + .setCompression(Compression.AUTO) // 64MB is a reasonable value that allows to amortize the cost of opening files, // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. @@ -257,7 +256,7 @@ public class TextIO { @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<String>> { @Nullable abstract ValueProvider<String> getFilepattern(); - abstract CompressionType getCompressionType(); + abstract Compression getCompression(); @Nullable abstract Duration getWatchForNewFilesInterval(); @@ -273,7 +272,7 @@ public class TextIO { @AutoValue.Builder abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); - abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setCompression(Compression compression); abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); abstract Builder setWatchForNewFilesTerminationCondition( TerminationCondition<?, ?> condition); @@ -307,13 +306,19 @@ public class TextIO { return toBuilder().setFilepattern(filepattern).build(); } + /** @deprecated Use {@link #withCompression}. */ + @Deprecated + public Read withCompressionType(TextIO.CompressionType compressionType) { + return withCompression(compressionType.canonical); + } + /** * Reads from input sources using the specified compression type. * - * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}. + * <p>If no compression type is specified, the default is {@link Compression#AUTO}. */ - public Read withCompressionType(TextIO.CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); + public Read withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); } /** @@ -364,7 +369,7 @@ public class TextIO { // All other cases go through ReadAll. ReadAll readAll = readAll() - .withCompressionType(getCompressionType()) + .withCompression(getCompression()) .withEmptyMatchTreatment(getEmptyMatchTreatment()); if (getWatchForNewFilesInterval() != null) { TerminationCondition<String, ?> readAllCondition = @@ -378,37 +383,8 @@ public class TextIO { // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - return wrapWithCompression( - new TextSource(getFilepattern(), getEmptyMatchTreatment()), - getCompressionType()); - } - - private static FileBasedSource<String> wrapWithCompression( - FileBasedSource<String> source, CompressionType compressionType) { - switch (compressionType) { - case UNCOMPRESSED: - return source; - case AUTO: - return CompressedSource.from(source); - case BZIP2: - return - CompressedSource.from(source) - .withDecompression(CompressionMode.BZIP2); - case GZIP: - return - CompressedSource.from(source) - .withDecompression(CompressionMode.GZIP); - case ZIP: - return - CompressedSource.from(source) - .withDecompression(CompressionMode.ZIP); - case DEFLATE: - return - CompressedSource.from(source) - .withDecompression(CompressionMode.DEFLATE); - default: - throw new IllegalArgumentException("Unknown compression type: " + compressionType); - } + return CompressedSource.from(new TextSource(getFilepattern(), getEmptyMatchTreatment())) + .withCompression(getCompression()); } @Override @@ -416,7 +392,7 @@ public class TextIO { super.populateDisplayData(builder); builder .add( - DisplayData.item("compressionType", getCompressionType().toString()) + DisplayData.item("compressionType", getCompression().toString()) .withLabel("Compression Type")) .addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")) @@ -435,7 +411,7 @@ public class TextIO { @AutoValue public abstract static class ReadAll extends PTransform<PCollection<String>, PCollection<String>> { - abstract CompressionType getCompressionType(); + abstract Compression getCompression(); @Nullable abstract Duration getWatchForNewFilesInterval(); @@ -450,7 +426,7 @@ public class TextIO { @AutoValue.Builder abstract static class Builder { - abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setCompression(Compression compression); abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); abstract Builder setWatchForNewFilesTerminationCondition( TerminationCondition<String, ?> condition); @@ -460,9 +436,19 @@ public class TextIO { abstract ReadAll build(); } - /** Same as {@link Read#withCompressionType(CompressionType)}. */ - public ReadAll withCompressionType(CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); + /** @deprecated Use {@link #withCompression}. */ + @Deprecated + public ReadAll withCompressionType(TextIO.CompressionType compressionType) { + return withCompression(compressionType.canonical); + } + + /** + * Reads from input sources using the specified compression type. + * + * <p>If no compression type is specified, the default is {@link Compression#AUTO}. + */ + public ReadAll withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); } /** Same as {@link Read#withEmptyMatchTreatment}. */ @@ -499,9 +485,9 @@ public class TextIO { .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( - new IsSplittableFn(getCompressionType()), + new IsSplittableFn(getCompression()), getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment()))) + new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment()))) .setCoder(StringUtf8Coder.of()); } @@ -510,39 +496,39 @@ public class TextIO { super.populateDisplayData(builder); builder.add( - DisplayData.item("compressionType", getCompressionType().toString()) + DisplayData.item("compressionType", getCompression().toString()) .withLabel("Compression Type")); } private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { - private final CompressionType compressionType; + private final Compression compression; private final EmptyMatchTreatment emptyMatchTreatment; private CreateTextSourceFn( - CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) { - this.compressionType = compressionType; + Compression compression, EmptyMatchTreatment emptyMatchTreatment) { + this.compression = compression; this.emptyMatchTreatment = emptyMatchTreatment; } @Override public FileBasedSource<String> apply(String input) { - return Read.wrapWithCompression( - new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType); + return CompressedSource.from( + new TextSource(StaticValueProvider.of(input), emptyMatchTreatment)) + .withCompression(compression); } } private static class IsSplittableFn implements SerializableFunction<String, Boolean> { - private final CompressionType compressionType; + private final Compression compression; - private IsSplittableFn(CompressionType compressionType) { - this.compressionType = compressionType; + private IsSplittableFn(Compression compression) { + this.compression = compression; } @Override public Boolean apply(String filename) { - return compressionType == CompressionType.UNCOMPRESSED - || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename)); + return !compression.isCompressed(filename); } } } @@ -811,7 +797,7 @@ public class TextIO { /** * Returns a transform for writing to text files like this one but that has the given {@link * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The - * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + * default is value is {@link Compression#UNCOMPRESSED}. * * <p>A {@code null} value will reset the value to the default value mentioned above. */ @@ -821,6 +807,16 @@ public class TextIO { } /** + * Returns a transform for writing to text files like this one but that compresses output using + * the given {@link Compression}. The default value is {@link Compression#UNCOMPRESSED}. + */ + public TypedWrite<UserT> withCompression(Compression compression) { + checkArgument(compression != null, "compression can not be null"); + return withWritableByteChannelFactory( + FileBasedSink.CompressionType.fromCanonical(compression)); + } + + /** * Preserves windowing of input elements and writes them to files based on the element's window. * * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using @@ -1063,48 +1059,36 @@ public class TextIO { } } - /** - * Possible text file compression types. - */ + /** @deprecated Use {@link Compression}. */ + @Deprecated public enum CompressionType { - /** - * Automatically determine the compression type based on filename extension. - */ - AUTO(""), - /** - * Uncompressed (i.e., may be split). - */ - UNCOMPRESSED(""), - /** - * GZipped. - */ - GZIP(".gz"), - /** - * BZipped. - */ - BZIP2(".bz2"), - /** - * Zipped. - */ - ZIP(".zip"), - /** - * Deflate compressed. - */ - DEFLATE(".deflate"); + /** @see Compression#AUTO */ + AUTO(Compression.AUTO), + + /** @see Compression#UNCOMPRESSED */ + UNCOMPRESSED(Compression.UNCOMPRESSED), + + /** @see Compression#GZIP */ + GZIP(Compression.GZIP), + + /** @see Compression#BZIP2 */ + BZIP2(Compression.BZIP2), - private String filenameSuffix; + /** @see Compression#ZIP */ + ZIP(Compression.ZIP), - CompressionType(String suffix) { - this.filenameSuffix = suffix; + /** @see Compression#ZIP */ + DEFLATE(Compression.DEFLATE); + + private Compression canonical; + + CompressionType(Compression canonical) { + this.canonical = canonical; } - /** - * Determine if a given filename matches a compression type based on its extension. - * @param filename the filename to match - * @return true iff the filename ends with the compression type's known extension. - */ + /** @see Compression#matches */ public boolean matches(String filename) { - return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); + return canonical.matches(filename); } } http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index fe6f01f..352d38a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -358,7 +358,7 @@ public class CompressedSourceTest { } @Test - public void testUncompressedFileIsSplittable() throws Exception { + public void testUncompressedFileWithAutoIsSplittable() throws Exception { String baseName = "test-input"; File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); @@ -370,6 +370,21 @@ public class CompressedSourceTest { SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } + + @Test + public void testUncompressedFileWithUncompressedIsSplittable() throws Exception { + String baseName = "test-input"; + + File uncompressedFile = tmpFolder.newFile(baseName + ".bin"); + Files.write(generateInput(10), uncompressedFile); + + CompressedSource<Byte> source = + CompressedSource.from(new ByteSource(uncompressedFile.getPath(), 1)) + .withDecompression(CompressionMode.UNCOMPRESSED); + assertTrue(source.isSplittable()); + SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); + } + @Test public void testGzipFileIsNotSplittable() throws Exception { String baseName = "test-input"; http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index ff30e33..fd8ad80 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -229,7 +228,7 @@ public class FileBasedSinkTest { String prefix = "file"; SimpleSink<Void> sink = SimpleSink.makeSimpleSink( - getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED); + getBaseOutputDirectory(), prefix, "", "", Compression.UNCOMPRESSED); WriteOperation<Void, String> writeOp = new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); @@ -320,7 +319,7 @@ public class FileBasedSinkTest { SimpleSink<Void> sink = SimpleSink.makeSimpleSink( - root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED); + root, "file", ".SSSSS.of.NNNNN", ".test", Compression.UNCOMPRESSED); FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); expected = @@ -347,7 +346,7 @@ public class FileBasedSinkTest { public void testCollidingOutputFilenames() throws IOException { ResourceId root = getBaseOutputDirectory(); SimpleSink<Void> sink = - SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED); + SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED); SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink); ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); @@ -376,7 +375,7 @@ public class FileBasedSinkTest { ResourceId root = getBaseOutputDirectory(); SimpleSink<Void> sink = SimpleSink.makeSimpleSink( - root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED); + root, "file", "-SSSSS-of-NNNNN", "", Compression.UNCOMPRESSED); FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); expected = @@ -398,11 +397,11 @@ public class FileBasedSinkTest { assertEquals(expected, actual); } - /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */ + /** {@link Compression#BZIP2} correctly writes BZip2 data. */ @Test - public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException { + public void testCompressionBZIP2() throws FileNotFoundException, IOException { final File file = - writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123"); + writeValuesWithCompression(Compression.BZIP2, "abc", "123"); // Read Bzip2ed data back in using Apache commons API (de facto standard). assertReadValues( new BufferedReader( @@ -413,10 +412,10 @@ public class FileBasedSinkTest { "123"); } - /** {@link CompressionType#GZIP} correctly writes Gzipped data. */ + /** {@link Compression#GZIP} correctly writes Gzipped data. */ @Test - public void testCompressionTypeGZIP() throws FileNotFoundException, IOException { - final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123"); + public void testCompressionGZIP() throws FileNotFoundException, IOException { + final File file = writeValuesWithCompression(Compression.GZIP, "abc", "123"); // Read Gzipped data back in using standard API. assertReadValues( new BufferedReader( @@ -426,11 +425,11 @@ public class FileBasedSinkTest { "123"); } - /** {@link CompressionType#DEFLATE} correctly writes deflate data. */ + /** {@link Compression#DEFLATE} correctly writes deflate data. */ @Test - public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException { + public void testCompressionDEFLATE() throws FileNotFoundException, IOException { final File file = - writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123"); + writeValuesWithCompression(Compression.DEFLATE, "abc", "123"); // Read Gzipped data back in using standard API. assertReadValues( new BufferedReader( @@ -441,11 +440,11 @@ public class FileBasedSinkTest { "123"); } - /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */ + /** {@link Compression#UNCOMPRESSED} correctly writes uncompressed data. */ @Test - public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException { + public void testCompressionUNCOMPRESSED() throws FileNotFoundException, IOException { final File file = - writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123"); + writeValuesWithCompression(Compression.UNCOMPRESSED, "abc", "123"); // Read uncompressed data back in using standard API. assertReadValues( new BufferedReader( @@ -462,11 +461,11 @@ public class FileBasedSinkTest { } } - private File writeValuesWithWritableByteChannelFactory( - final WritableByteChannelFactory factory, String... values) throws IOException { + private File writeValuesWithCompression( + Compression compression, String... values) throws IOException { final File file = tmpFolder.newFile("test.gz"); final WritableByteChannel channel = - factory.create(Channels.newChannel(new FileOutputStream(file))); + compression.writeCompressed(Channels.newChannel(new FileOutputStream(file))); for (String value : values) { channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8))); } @@ -512,7 +511,7 @@ public class FileBasedSinkTest { /** Build a SimpleSink with default options. */ private SimpleSink<Void> buildSink() { return SimpleSink.makeSimpleSink( - getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED); + getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", Compression.UNCOMPRESSED); } /** Build a SimpleWriteOperation with default options and the given temporary directory. */ http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index 382898d..b59876f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -36,12 +36,19 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory); } + public SimpleSink( + ResourceId tempDirectory, + DynamicDestinations<String, DestinationT, String> dynamicDestinations, + Compression compression) { + super(StaticValueProvider.of(tempDirectory), dynamicDestinations, compression); + } + public static SimpleSink<Void> makeSimpleSink( ResourceId tempDirectory, FilenamePolicy filenamePolicy) { return new SimpleSink<>( tempDirectory, DynamicFileDestinations.<String>constant(filenamePolicy), - CompressionType.UNCOMPRESSED); + Compression.UNCOMPRESSED); } public static SimpleSink<Void> makeSimpleSink( @@ -61,6 +68,20 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, Strin return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory); } + public static SimpleSink<Void> makeSimpleSink( + ResourceId baseDirectory, + String prefix, + String shardTemplate, + String suffix, + Compression compression) { + return makeSimpleSink( + baseDirectory, + prefix, + shardTemplate, + suffix, + FileBasedSink.CompressionType.fromCanonical(compression)); + } + @Override public SimpleWriteOperation<DestinationT> createWriteOperation() { return new SimpleWriteOperation<>(this); http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index d564d3b..6e5e4da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -17,11 +17,10 @@ */ package org.apache.beam.sdk.io; -import static org.apache.beam.sdk.io.TFRecordIO.CompressionType; -import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.AUTO; -import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.GZIP; -import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.NONE; -import static org.apache.beam.sdk.io.TFRecordIO.CompressionType.ZLIB; +import static org.apache.beam.sdk.io.Compression.AUTO; +import static org.apache.beam.sdk.io.Compression.DEFLATE; +import static org.apache.beam.sdk.io.Compression.GZIP; +import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertEquals; @@ -144,7 +143,7 @@ public class TFRecordIOTest { public void testReadDisplayData() { TFRecordIO.Read read = TFRecordIO.read() .from("foo.*") - .withCompressionType(GZIP) + .withCompression(GZIP) .withoutValidation(); DisplayData displayData = DisplayData.from(read); @@ -161,7 +160,7 @@ public class TFRecordIOTest { .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) - .withCompressionType(GZIP); + .withCompression(GZIP); DisplayData displayData = DisplayData.from(write); @@ -265,25 +264,25 @@ public class TFRecordIOTest { @Test @Category(NeedsRunner.class) public void runTestRoundTrip() throws IOException { - runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, NONE); + runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED); } @Test @Category(NeedsRunner.class) public void runTestRoundTripWithEmptyData() throws IOException { - runTestRoundTrip(EMPTY, 10, ".tfrecords", NONE, NONE); + runTestRoundTrip(EMPTY, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED); } @Test @Category(NeedsRunner.class) public void runTestRoundTripWithOneShards() throws IOException { - runTestRoundTrip(LARGE, 1, ".tfrecords", NONE, NONE); + runTestRoundTrip(LARGE, 1, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED); } @Test @Category(NeedsRunner.class) public void runTestRoundTripWithSuffix() throws IOException { - runTestRoundTrip(LARGE, 10, ".suffix", NONE, NONE); + runTestRoundTrip(LARGE, 10, ".suffix", UNCOMPRESSED, UNCOMPRESSED); } @Test @@ -295,13 +294,13 @@ public class TFRecordIOTest { @Test @Category(NeedsRunner.class) public void runTestRoundTripZlib() throws IOException { - runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, ZLIB); + runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, DEFLATE); } @Test @Category(NeedsRunner.class) public void runTestRoundTripUncompressedFilesWithAuto() throws IOException { - runTestRoundTrip(LARGE, 10, ".tfrecords", NONE, AUTO); + runTestRoundTrip(LARGE, 10, ".tfrecords", UNCOMPRESSED, AUTO); } @Test @@ -313,14 +312,14 @@ public class TFRecordIOTest { @Test @Category(NeedsRunner.class) public void runTestRoundTripZlibFilesWithAuto() throws IOException { - runTestRoundTrip(LARGE, 10, ".tfrecords", ZLIB, AUTO); + runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, AUTO); } private void runTestRoundTrip(Iterable<String> elems, int numShards, String suffix, - CompressionType writeCompressionType, - CompressionType readCompressionType) throws IOException { + Compression writeCompression, + Compression readCompression) throws IOException { String outputName = "file"; Path baseDir = Files.createTempDirectory(tempFolder, "test-rt"); String baseFilename = baseDir.resolve(outputName).toString(); @@ -328,14 +327,14 @@ public class TFRecordIOTest { TFRecordIO.Write write = TFRecordIO.write().to(baseFilename) .withNumShards(numShards) .withSuffix(suffix) - .withCompressionType(writeCompressionType); + .withCompression(writeCompression); p.apply(Create.of(elems).withCoder(StringUtf8Coder.of())) .apply(ParDo.of(new StringToByteArray())) .apply(write); p.run(); TFRecordIO.Read read = TFRecordIO.read().from(baseFilename + "*") - .withCompressionType(readCompressionType); + .withCompression(readCompression); PCollection<String> output = p2.apply(read).apply(ParDo.of(new ByteArrayToString())); PAssert.that(output).containsInAnyOrder(elems);
