[FLINK-4737] [core] Add support for bz2 and xy compression in flink-core. Adds a dependency on 'commons-compression'.
This closes #2002 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81aec410 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81aec410 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81aec410 Branch: refs/heads/master Commit: 81aec4109adc18f4bd4d1ddb91892faed10f4f14 Parents: 954ef08 Author: Milosz Tanski <[email protected]> Authored: Wed May 18 00:04:36 2016 -0400 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:13 2016 +0200 ---------------------------------------------------------------------- docs/dev/batch/index.md | 10 ++++ flink-core/pom.xml | 10 +++- .../flink/api/common/io/FileInputFormat.java | 6 ++- .../io/compression/Bzip2InputStreamFactory.java | 50 ++++++++++++++++++++ .../compression/InflaterInputStreamFactory.java | 3 +- .../io/compression/XZInputStreamFactory.java | 49 +++++++++++++++++++ 6 files changed, 123 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/docs/dev/batch/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md index 5cdc36d..0b1c9f9 100644 --- a/docs/dev/batch/index.md +++ b/docs/dev/batch/index.md @@ -1114,6 +1114,16 @@ The following table lists the currently supported compression methods. <td><code>.gz</code>, <code>.gzip</code></td> <td>no</td> </tr> + <tr> + <td><strong>Bzip2</strong></td> + <td><code>.bz2</code></td> + <td>no</td> + </tr> + <tr> + <td><strong>XZ</strong></td> + <td><code>.xz</code></td> + <td>no</td> + </tr> </tbody> </table> http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index fe24f0e..40e5a2e 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -75,9 +75,15 @@ under the License. <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> - - <!-- test dependencies --> + <!-- Commons compression, for additional decompressors --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.4</version> + </dependency> + + <!-- test dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index d0f5166..1d092af 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -19,9 +19,11 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory; import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; +import org.apache.flink.api.common.io.compression.XZInputStreamFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -116,7 +118,9 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS private static void initDefaultInflaterInputStreamFactories() { InflaterInputStreamFactory<?>[] defaultFactories = { DeflateInflaterInputStreamFactory.getInstance(), - GzipInflaterInputStreamFactory.getInstance() + GzipInflaterInputStreamFactory.getInstance(), + Bzip2InputStreamFactory.getInstance(), + XZInputStreamFactory.getInstance(), }; for (InflaterInputStreamFactory<?> inputStreamFactory : defaultFactories) { for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) { http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java new file mode 100644 index 0000000..d204907 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io.compression; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + +@Internal +public class Bzip2InputStreamFactory implements InflaterInputStreamFactory<BZip2CompressorInputStream> { + + private static Bzip2InputStreamFactory INSTANCE = null; + + public static Bzip2InputStreamFactory getInstance() { + if (INSTANCE == null) { + INSTANCE = new Bzip2InputStreamFactory(); + } + return INSTANCE; + } + + @Override + public BZip2CompressorInputStream create(InputStream in) throws IOException { + return new BZip2CompressorInputStream(in); + } + + @Override + public Collection<String> getCommonFileExtensions() { + return Collections.singleton("bz2"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java index 7fbc50d..c973763 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java @@ -23,13 +23,12 @@ import org.apache.flink.annotation.Internal; import java.io.IOException; import java.io.InputStream; import java.util.Collection; -import java.util.zip.InflaterInputStream; /** * Creates a new instance of a certain subclass of {@link java.util.zip.InflaterInputStream}. */ @Internal -public interface InflaterInputStreamFactory<T extends InflaterInputStream> { +public interface InflaterInputStreamFactory<T extends InputStream> { /** * Creates a {@link java.util.zip.InflaterInputStream} that wraps the given input stream. http://git-wip-us.apache.org/repos/asf/flink/blob/81aec410/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java new file mode 100644 index 0000000..c80de40 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io.compression; + +import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; + +@Internal +public class XZInputStreamFactory implements InflaterInputStreamFactory<XZCompressorInputStream> { + + private static XZInputStreamFactory INSTANCE = null; + + public static XZInputStreamFactory getInstance() { + if (INSTANCE == null) { + INSTANCE = new XZInputStreamFactory(); + } + return INSTANCE; + } + + @Override + public XZCompressorInputStream create(InputStream in) throws IOException { + return new XZCompressorInputStream(in, true); + } + + @Override + public Collection<String> getCommonFileExtensions() { + return Collections.singleton("xz"); + } +}
