[ https://issues.apache.org/jira/browse/PARQUET-1241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16574328#comment-16574328 ]
Alex Wang edited comment on PARQUET-1241 at 8/9/18 5:56 AM: ------------------------------------------------------------ Hi, I and [~wesmckinn] discussed the issue on parquet-cpp mailing list, And upon further investigation I found from the hadoop Jira ticket (https://issues.apache.org/jira/browse/HADOOP-12990) that hadoop LZ4 format prefixes the compressed data with *original data length (big-endian)* and then *compressed data length (big-endian)*. Via gdb into the *parquet_reader* binary while reading a parquet-mr (1.10.0 release) generated parquet file (with LZ4 compression), I could confirm that the compressed column page buffer indeed has the 8-byte prefix. {noformat} # From gdb: Breakpoint 1, arrow::Lz4Codec::Decompress (this=0xc1d5e0, input_len=109779, input=0x7ffff665442e "", output_len=155352, output_buffer=0x7ffff624b040 "") at /opt/parquet-cpp/arrow_ep-prefix/src/arrow_ep/cpp/src/arrow/util/compression_lz4.cc:36 36 static_cast<int>(input_len), static_cast<int>(output_len)); Missing separate debuginfos, use: debuginfo-install glibc-2.17-222.el7.x86_64 libgcc-4.8.5-28.el7_5.1.x86_64 libstdc++-4.8.5-28.el7_5.1.x86_64 (gdb) p/x *((uint32_t*)input+1) $3 = 0xcbac0100 # From python (convert to little endian): >>> 0x0001accb 109771 {noformat} The result *109771 = 109779 - 8*. And if I skipped the first 8 bytes and decompress, I could get correct column values. Seems to me that hadoop will not likely change this format (been there since 2011), I'd like to propose changes like below: {noformat} diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 23a5c39..feeb124 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -22,6 +22,7 @@ #include <lz4.h> #include "arrow/status.h" +#include "arrow/util/bit-util.h" #include "arrow/util/macros.h" namespace arrow { @@ -35,6 +36,19 @@ Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t out reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len), static_cast<int>(output_len)); if (decompressed_size < 0) { + // For hadoop lz4 compression format, the compressed data is prefixed + // with original data length (big-endian) and then compressed data + // length (big-endian). + // + // If the prefix could match the format, try to decompress from 'input + 8'. + if (BitUtil::FromBigEndian(*(reinterpret_cast<const uint32_t*>(input) + 1)) == input_len - 8) { + decompressed_size = LZ4_decompress_safe( + reinterpret_cast<const char*>(input) + 8, reinterpret_cast<char*>(output_buffer), + static_cast<int>(input_len) - 8, static_cast<int>(output_len)); + if (decompressed_size >= 0) { + return Status::OK(); + } + } return Status::IOError("Corrupt Lz4 compressed data."); } return Status::OK();{noformat} was (Author: ee07b291): Hi, I and [~wesmckinn] discussed the issue on parquet-cpp mailing list, And upon further investigation I found from the hadoop Jira ticket (https://issues.apache.org/jira/browse/HADOOP-12990) that hadoop LZ4 format prefixes the compressed data with *original data length (big-endian)* and then *compressed data length (big-endian)*. Via gdb into the *parquet_reader* binary while reading a parquet-mr (1.10.0 release) generated parquet file (with LZ4 compression), I could confirm that the compressed column page buffer indeed has the 8-byte prefix. {noformat} # From gdb: Breakpoint 1, arrow::Lz4Codec::Decompress (this=0xc1d5e0, input_len=109779, input=0x7ffff665442e "", output_len=155352, output_buffer=0x7ffff624b040 "") at /opt/parquet-cpp/arrow_ep-prefix/src/arrow_ep/cpp/src/arrow/util/compression_lz4.cc:36 36 static_cast<int>(input_len), static_cast<int>(output_len)); Missing separate debuginfos, use: debuginfo-install glibc-2.17-222.el7.x86_64 libgcc-4.8.5-28.el7_5.1.x86_64 libstdc++-4.8.5-28.el7_5.1.x86_64 (gdb) p/x *((uint32_t*)input+1) $3 = 0xcbac0100 # From python (convert to little endian): >>> 0x0001accb 109771 {noformat} The result *109771 = 109779 - 8*. And if I skipped the first 8 bytes and decompress, I could get correct column values. Seems to me that hadoop will not likely change this format (been there since 2011), I'd like to propose changes like below: {noformat} diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 23a5c39..feeb124 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -22,6 +22,7 @@ #include <lz4.h> #include "arrow/status.h" +#include "arrow/util/bit-util.h" #include "arrow/util/macros.h" namespace arrow { @@ -35,6 +36,19 @@ Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t out reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len), static_cast<int>(output_len)); if (decompressed_size < 0) { + // For hadoop lz4 compression format, the compressed data is prefixed + // with original data length (big-endian) and then compressed data + // length (big-endian). + // + // If the prefix could match the format, try to decompress from 'input + 8'. + if (BitUtil::FromBigEndian(*(reinterpret_cast<const uint32_t*>(input) + 1)) == input_len - 8) { + decompressed_size = LZ4_decompress_safe( + reinterpret_cast<const char*>(input) + 8, reinterpret_cast<char*>(output_buffer), + static_cast<int>(input_len) - 8, static_cast<int>(output_len)); + if (decompressed_size >= 0) { + return Status::OK(); + } + } return Status::IOError("Corrupt Lz4 compressed data."); } return Status::OK();{noformat} > Use LZ4 frame format > -------------------- > > Key: PARQUET-1241 > URL: https://issues.apache.org/jira/browse/PARQUET-1241 > Project: Parquet > Issue Type: Improvement > Components: parquet-cpp, parquet-format > Reporter: Lawrence Chan > Priority: Major > > The parquet-format spec doesn't currently specify whether lz4-compressed data > should be framed or not. We should choose one and make it explicit in the > spec, as they are not inter-operable. After some discussions with others [1], > we think it would be beneficial to use the framed format, which adds a small > header in exchange for more self-contained decompression as well as a richer > feature set (checksums, parallel decompression, etc). > The current arrow implementation compresses using the lz4 block format, and > this would need to be updated when we add the spec clarification. > If backwards compatibility is a concern, I would suggest adding an additional > LZ4_FRAMED compression type, but that may be more noise than anything. > [1] https://github.com/dask/fastparquet/issues/314 -- This message was sent by Atlassian JIRA (v7.6.3#76005)