This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f220277 Revert "[FLINK-20130][core] Add ZStandard to FileInputFormat"
f220277 is described below
commit f220277c907852c028150a26fb5aff55dcff9d57
Author: Xintong Song <[email protected]>
AuthorDate: Tue Mar 16 12:47:29 2021 +0800
Revert "[FLINK-20130][core] Add ZStandard to FileInputFormat"
This reverts commit 889b3845217141b295eb2b60c3dd8a2c245b429a.
---
docs/content.zh/docs/dev/dataset/overview.md | 1 -
.../flink/api/common/io/FileInputFormat.java | 2 -
.../compression/ZStandardInputStreamFactory.java | 50 ------------------
.../api/common/io/GenericCsvInputFormatTest.java | 59 ----------------------
4 files changed, 112 deletions(-)
diff --git a/docs/content.zh/docs/dev/dataset/overview.md
b/docs/content.zh/docs/dev/dataset/overview.md
index 8faee27..ac4f7ca 100644
--- a/docs/content.zh/docs/dev/dataset/overview.md
+++ b/docs/content.zh/docs/dev/dataset/overview.md
@@ -1008,7 +1008,6 @@ The following table lists the currently supported
compression methods.
| GZip | .gz, .gzip | no |
| Bzip2 | .bz2 | no |
| XZ | .xz | no |
-| ZStandart | .zst | no |
## Data Sinks
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 91bb9e8..318a41e 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -24,7 +24,6 @@ import
org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFact
import
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
-import org.apache.flink.api.common.io.compression.ZStandardInputStreamFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -121,7 +120,6 @@ public abstract class FileInputFormat<OT> extends
RichInputFormat<OT, FileInputS
GzipInflaterInputStreamFactory.getInstance(),
Bzip2InputStreamFactory.getInstance(),
XZInputStreamFactory.getInstance(),
- ZStandardInputStreamFactory.getInstance()
};
for (InflaterInputStreamFactory<?> inputStreamFactory :
defaultFactories) {
for (String fileExtension :
inputStreamFactory.getCommonFileExtensions()) {
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/ZStandardInputStreamFactory.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/ZStandardInputStreamFactory.java
deleted file mode 100644
index ed00cdb..0000000
---
a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/ZStandardInputStreamFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.io.compression;
-
-import org.apache.flink.annotation.Internal;
-
-import
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Collections;
-
-/** Factory for ZStandard decompressors. */
-@Internal
-public class ZStandardInputStreamFactory
- implements InflaterInputStreamFactory<ZstdCompressorInputStream> {
-
- private static final ZStandardInputStreamFactory INSTANCE = new
ZStandardInputStreamFactory();
-
- public static ZStandardInputStreamFactory getInstance() {
- return INSTANCE;
- }
-
- @Override
- public ZstdCompressorInputStream create(InputStream in) throws IOException
{
- return new ZstdCompressorInputStream(in);
- }
-
- @Override
- public Collection<String> getCommonFileExtensions() {
- return Collections.singleton("zst");
- }
-}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index 4918040..da764f0 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
-import
org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
@@ -206,46 +205,6 @@ public class GenericCsvInputFormatTest {
}
@Test
- public void testReadNoPosAllZStandard() throws IOException {
- try {
- final String fileContent =
"111|222|333|444|555\n666|777|888|999|000|";
- final FileInputSplit split = createTempZStandardFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- format.setFieldDelimiter("|");
- format.setFieldTypesGeneric(
- IntValue.class, IntValue.class, IntValue.class,
IntValue.class, IntValue.class);
-
- format.configure(parameters);
- format.open(split);
-
- Value[] values = createIntValues(5);
-
- values = format.nextRecord(values);
- assertNotNull(values);
- assertEquals(111, ((IntValue) values[0]).getValue());
- assertEquals(222, ((IntValue) values[1]).getValue());
- assertEquals(333, ((IntValue) values[2]).getValue());
- assertEquals(444, ((IntValue) values[3]).getValue());
- assertEquals(555, ((IntValue) values[4]).getValue());
-
- values = format.nextRecord(values);
- assertNotNull(values);
- assertEquals(666, ((IntValue) values[0]).getValue());
- assertEquals(777, ((IntValue) values[1]).getValue());
- assertEquals(888, ((IntValue) values[2]).getValue());
- assertEquals(999, ((IntValue) values[3]).getValue());
- assertEquals(000, ((IntValue) values[4]).getValue());
-
- assertNull(format.nextRecord(values));
- assertTrue(format.reachedEnd());
- } catch (Exception ex) {
- fail("Test failed due to a " + ex.getClass().getSimpleName() + ":
" + ex.getMessage());
- }
- }
-
- @Test
public void testReadNoPosFirstN() throws IOException {
try {
final String fileContent =
"111|222|333|444|555|\n666|777|888|999|000|";
@@ -842,24 +801,6 @@ public class GenericCsvInputFormatTest {
new String[] {"localhost"});
}
- private FileInputSplit createTempZStandardFile(String content) throws
IOException {
- File tempFile = File.createTempFile("test_contents", "tmp.zst");
- tempFile.deleteOnExit();
-
- DataOutputStream dos =
- new DataOutputStream(
- new ZstdCompressorOutputStream(new
FileOutputStream(tempFile)));
- dos.writeBytes(content);
- dos.close();
-
- return new FileInputSplit(
- 0,
- new Path(tempFile.toURI().toString()),
- 0,
- tempFile.length(),
- new String[] {"localhost"});
- }
-
private Value[] createIntValues(int num) {
Value[] v = new Value[num];