[
https://issues.apache.org/jira/browse/PARQUET-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611441#comment-17611441
]
ASF GitHub Bot commented on PARQUET-2196:
-----------------------------------------
wgtmac commented on code in PR #1000:
URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r984349672
##########
parquet-cli/src/main/java/org/apache/parquet/cli/Util.java:
##########
@@ -151,6 +151,8 @@ public static String shortCodec(CompressionCodecName codec)
{
return "B";
case LZ4:
return "4";
+ case LZ4_RAW:
+ return "4";
Review Comment:
In fact they are the same compression except that the LZ4_HADOOP adds extra
8 bytes for uncompressed size and compressed size. Check here for reference:
https://github.com/airlift/aircompressor/blob/master/src/main/java/io/airlift/compress/lz4/HadoopLz4OutputStream.java#L97
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawCompressor extends NonBlockedCompressor {
+
+ private Lz4Compressor compressor = new Lz4Compressor();
Review Comment:
I don't think this is an issue. parquet-mr creates CompressionInputStream
and CompressionOutputStream based on new allocated Compressor and Decompressor
objects. ZstandardCodec even returns null for the createCompressor and
createDecompressor methods.
##########
parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java:
##########
@@ -30,7 +30,8 @@ public enum CompressionCodecName {
LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI,
".br"),
LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
- ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec",
CompressionCodec.ZSTD, ".zstd");
+ ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec",
CompressionCodec.ZSTD, ".zstd"),
+ LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec",
CompressionCodec.LZ4_RAW, ".lz4");
Review Comment:
I haven't found the suffix parity in the C++ implementation. But I can
switch it to "lz4_raw" if that makes sense:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression.cc#L51
##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+ // The link includes a reference to a specific commit. To take a newer
version - update this link.
+ private static final String PARQUET_TESTING_REPO =
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";
+ private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+ private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+ private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ @Test
+ public void testInteropReadLz4RawParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testInteropReadLz4RawParquetFiles {} ========",
rootPath.toString());
+
+ // Test simple parquet file with lz4 raw compressed
+ Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+ readParquetFile(simpleFile, 4);
+
+ // Test larger parquet file with lz4 raw compressed
+ Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+ readParquetFile(largerFile, 10000);
+ }
+
+ private Path downloadInteropFiles(Path rootPath, String fileName,
OkHttpClient httpClient) throws IOException {
+ LOG.info("Download interop files if needed");
+ Configuration conf = new Configuration();
+ FileSystem fs = rootPath.getFileSystem(conf);
+ LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+ if (!fs.exists(rootPath)) {
+ LOG.info("Create folder for interop files: " + rootPath);
+ if (!fs.mkdirs(rootPath)) {
+ throw new IOException("Cannot create path " + rootPath);
+ }
+ }
+
+ Path file = new Path(rootPath, fileName);
+ if (!fs.exists(file)) {
+ String downloadUrl = PARQUET_TESTING_REPO + fileName;
+ LOG.info("Download interop file: " + downloadUrl);
+ Request request = new Request.Builder().url(downloadUrl).build();
Review Comment:
I learned it from here:
https://github.com/apache/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java#L454
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Lz4 raw compression codec for Parquet. This codec type has been introduced
+ * into the parquet format since version 2.9.0.
Review Comment:
It looks like the differentiation between them is not clearly documented:
https://github.com/apache/parquet-format/blob/master/Compression.md
##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+ // The link includes a reference to a specific commit. To take a newer
version - update this link.
+ private static final String PARQUET_TESTING_REPO =
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";
+ private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+ private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+ private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ @Test
+ public void testInteropReadLz4RawParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testInteropReadLz4RawParquetFiles {} ========",
rootPath.toString());
+
+ // Test simple parquet file with lz4 raw compressed
+ Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+ readParquetFile(simpleFile, 4);
+
+ // Test larger parquet file with lz4 raw compressed
+ Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+ readParquetFile(largerFile, 10000);
+ }
+
+ private Path downloadInteropFiles(Path rootPath, String fileName,
OkHttpClient httpClient) throws IOException {
+ LOG.info("Download interop files if needed");
+ Configuration conf = new Configuration();
+ FileSystem fs = rootPath.getFileSystem(conf);
+ LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+ if (!fs.exists(rootPath)) {
+ LOG.info("Create folder for interop files: " + rootPath);
+ if (!fs.mkdirs(rootPath)) {
+ throw new IOException("Cannot create path " + rootPath);
+ }
+ }
+
+ Path file = new Path(rootPath, fileName);
+ if (!fs.exists(file)) {
+ String downloadUrl = PARQUET_TESTING_REPO + fileName;
+ LOG.info("Download interop file: " + downloadUrl);
+ Request request = new Request.Builder().url(downloadUrl).build();
+ Response response = httpClient.newCall(request).execute();
+ if (!response.isSuccessful()) {
+ throw new IOException("Failed to download file: " + response);
+ }
+ try (FSDataOutputStream fdos = fs.create(file)) {
+ fdos.write(response.body().bytes());
+ }
+ }
+ return file;
+ }
+
+ private void readParquetFile(Path filePath, int expectedNumRows) throws
IOException {
+ try (ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), filePath).build()) {
+ int numRows = 0;
+ while (reader.read() != null) {
+ numRows++;
Review Comment:
That would be better but I think it is enough to make sure the decompression
is not corrupted and we haven't missed any row.
##########
parquet-hadoop/pom.xml:
##########
@@ -102,6 +102,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
Review Comment:
Apache ORC depends on aircompressor to implement its codec. So I assume
that's fine.
> Support LZ4_RAW codec
> ---------------------
>
> Key: PARQUET-2196
> URL: https://issues.apache.org/jira/browse/PARQUET-2196
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Reporter: Gang Wu
> Priority: Major
>
> There is a long history about the LZ4 interoperability of parquet files
> between parquet-mr and parquet-cpp (which is now in the Apache Arrow).
> Attached links are the evidence. In short, a new LZ4_RAW codec type has been
> introduced since parquet format v2.9.0. However, only parquet-cpp supports
> LZ4_RAW. The parquet-mr library still uses the old Hadoop-provided LZ4 codec
> and cannot read parquet files with LZ4_RAW.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)