[ 
https://issues.apache.org/jira/browse/PARQUET-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611441#comment-17611441
 ] 

ASF GitHub Bot commented on PARQUET-2196:
-----------------------------------------

wgtmac commented on code in PR #1000:
URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r984349672


##########
parquet-cli/src/main/java/org/apache/parquet/cli/Util.java:
##########
@@ -151,6 +151,8 @@ public static String shortCodec(CompressionCodecName codec) 
{
         return "B";
       case LZ4:
         return "4";
+      case LZ4_RAW:
+        return "4";

Review Comment:
   In fact they are the same compression except that the LZ4_HADOOP adds extra 
8 bytes for uncompressed size and compressed size. Check here for reference: 
https://github.com/airlift/aircompressor/blob/master/src/main/java/io/airlift/compress/lz4/HadoopLz4OutputStream.java#L97



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java:
##########
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawCompressor extends NonBlockedCompressor {
+
+  private Lz4Compressor compressor = new Lz4Compressor();

Review Comment:
   I don't think this is an issue. parquet-mr creates CompressionInputStream 
and CompressionOutputStream based on new allocated Compressor and Decompressor 
objects.  ZstandardCodec even returns null for the createCompressor and 
createDecompressor methods.



##########
parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java:
##########
@@ -30,7 +30,8 @@ public enum CompressionCodecName {
   LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
   BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, 
".br"),
   LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
-  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", 
CompressionCodec.ZSTD, ".zstd");
+  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", 
CompressionCodec.ZSTD, ".zstd"),
+  LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", 
CompressionCodec.LZ4_RAW, ".lz4");

Review Comment:
   I haven't found the suffix parity in the C++ implementation. But I can 
switch it to "lz4_raw" if that makes sense: 
https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression.cc#L51



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+  // The link includes a reference to a specific commit. To take a newer 
version - update this link.
+  private static final String PARQUET_TESTING_REPO = 
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";;
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+  private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testInteropReadLz4RawParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawParquetFiles {} ========", 
rootPath.toString());
+
+    // Test simple parquet file with lz4 raw compressed
+    Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+    readParquetFile(simpleFile, 4);
+
+    // Test larger parquet file with lz4 raw compressed
+    Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+    readParquetFile(largerFile, 10000);
+  }
+
+  private Path downloadInteropFiles(Path rootPath, String fileName, 
OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interop files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interop files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interop file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();

Review Comment:
   I learned it from here: 
https://github.com/apache/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java#L454



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Lz4 raw compression codec for Parquet. This codec type has been introduced
+ * into the parquet format since version 2.9.0.

Review Comment:
   It looks like the differentiation between them is not clearly documented: 
https://github.com/apache/parquet-format/blob/master/Compression.md



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+  // The link includes a reference to a specific commit. To take a newer 
version - update this link.
+  private static final String PARQUET_TESTING_REPO = 
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";;
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+  private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testInteropReadLz4RawParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawParquetFiles {} ========", 
rootPath.toString());
+
+    // Test simple parquet file with lz4 raw compressed
+    Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+    readParquetFile(simpleFile, 4);
+
+    // Test larger parquet file with lz4 raw compressed
+    Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+    readParquetFile(largerFile, 10000);
+  }
+
+  private Path downloadInteropFiles(Path rootPath, String fileName, 
OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interop files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interop files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interop file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();
+      Response response = httpClient.newCall(request).execute();
+      if (!response.isSuccessful()) {
+        throw new IOException("Failed to download file: " + response);
+      }
+      try (FSDataOutputStream fdos = fs.create(file)) {
+        fdos.write(response.body().bytes());
+      }
+    }
+    return file;
+  }
+
+  private void readParquetFile(Path filePath, int expectedNumRows) throws 
IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), filePath).build()) {
+      int numRows = 0;
+      while (reader.read() != null) {
+        numRows++;

Review Comment:
   That would be better but I think it is enough to make sure the decompression 
is not corrupted and we haven't missed any row.



##########
parquet-hadoop/pom.xml:
##########
@@ -102,6 +102,11 @@
       <type>jar</type>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>

Review Comment:
   Apache ORC depends on aircompressor to implement its codec. So I assume 
that's fine.





> Support LZ4_RAW codec
> ---------------------
>
>                 Key: PARQUET-2196
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2196
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Gang Wu
>            Priority: Major
>
> There is a long history about the LZ4 interoperability of parquet files 
> between parquet-mr and parquet-cpp (which is now in the Apache Arrow). 
> Attached links are the evidence. In short, a new LZ4_RAW codec type has been 
> introduced since parquet format v2.9.0. However, only parquet-cpp supports 
> LZ4_RAW. The parquet-mr library still uses the old Hadoop-provided LZ4 codec 
> and cannot read parquet files with LZ4_RAW.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to