wgtmac commented on code in PR #1000:
URL: https://github.com/apache/parquet-mr/pull/1000#discussion_r984349672


##########
parquet-cli/src/main/java/org/apache/parquet/cli/Util.java:
##########
@@ -151,6 +151,8 @@ public static String shortCodec(CompressionCodecName codec) 
{
         return "B";
       case LZ4:
         return "4";
+      case LZ4_RAW:
+        return "4";

Review Comment:
   In fact they are the same compression except that the LZ4_HADOOP adds extra 
8 bytes for uncompressed size and compressed size. Check here for reference: 
https://github.com/airlift/aircompressor/blob/master/src/main/java/io/airlift/compress/lz4/HadoopLz4OutputStream.java#L97



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java:
##########
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawCompressor extends NonBlockedCompressor {
+
+  private Lz4Compressor compressor = new Lz4Compressor();

Review Comment:
   I don't think this is an issue. parquet-mr creates CompressionInputStream 
and CompressionOutputStream based on new allocated Compressor and Decompressor 
objects.  ZstandardCodec even returns null for the createCompressor and 
createDecompressor methods.



##########
parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java:
##########
@@ -30,7 +30,8 @@ public enum CompressionCodecName {
   LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
   BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, 
".br"),
   LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
-  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", 
CompressionCodec.ZSTD, ".zstd");
+  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", 
CompressionCodec.ZSTD, ".zstd"),
+  LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", 
CompressionCodec.LZ4_RAW, ".lz4");

Review Comment:
   I haven't found the suffix parity in the C++ implementation. But I can 
switch it to "lz4_raw" if that makes sense: 
https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression.cc#L51



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+  // The link includes a reference to a specific commit. To take a newer 
version - update this link.
+  private static final String PARQUET_TESTING_REPO = 
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";;
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+  private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testInteropReadLz4RawParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawParquetFiles {} ========", 
rootPath.toString());
+
+    // Test simple parquet file with lz4 raw compressed
+    Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+    readParquetFile(simpleFile, 4);
+
+    // Test larger parquet file with lz4 raw compressed
+    Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+    readParquetFile(largerFile, 10000);
+  }
+
+  private Path downloadInteropFiles(Path rootPath, String fileName, 
OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interop files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interop files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interop file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();

Review Comment:
   I learned it from here: 
https://github.com/apache/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java#L454



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Lz4 raw compression codec for Parquet. This codec type has been introduced
+ * into the parquet format since version 2.9.0.

Review Comment:
   It looks like the differentiation between them is not clearly documented: 
https://github.com/apache/parquet-format/blob/master/Compression.md



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestInteropReadLz4RawCodec {
+
+  // The link includes a reference to a specific commit. To take a newer 
version - update this link.
+  private static final String PARQUET_TESTING_REPO = 
"https://github.com/apache/parquet-testing/raw/19fcd4d/data/";;
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+  private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testInteropReadLz4RawParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawParquetFiles {} ========", 
rootPath.toString());
+
+    // Test simple parquet file with lz4 raw compressed
+    Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+    readParquetFile(simpleFile, 4);
+
+    // Test larger parquet file with lz4 raw compressed
+    Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+    readParquetFile(largerFile, 10000);
+  }
+
+  private Path downloadInteropFiles(Path rootPath, String fileName, 
OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interop files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interop files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interop file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();
+      Response response = httpClient.newCall(request).execute();
+      if (!response.isSuccessful()) {
+        throw new IOException("Failed to download file: " + response);
+      }
+      try (FSDataOutputStream fdos = fs.create(file)) {
+        fdos.write(response.body().bytes());
+      }
+    }
+    return file;
+  }
+
+  private void readParquetFile(Path filePath, int expectedNumRows) throws 
IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), filePath).build()) {
+      int numRows = 0;
+      while (reader.read() != null) {
+        numRows++;

Review Comment:
   That would be better but I think it is enough to make sure the decompression 
is not corrupted and we haven't missed any row.



##########
parquet-hadoop/pom.xml:
##########
@@ -102,6 +102,11 @@
       <type>jar</type>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>

Review Comment:
   Apache ORC depends on aircompressor to implement its codec. So I assume 
that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to