wesm commented on a change in pull request #7789:
URL: https://github.com/apache/arrow/pull/7789#discussion_r463842568



##########
File path: cpp/src/arrow/util/compression.cc
##########
@@ -131,7 +131,7 @@ Result<std::unique_ptr<Codec>> 
Codec::Create(Compression::type codec_type,
       if (compression_level_set) {
         return Status::Invalid("LZ4 doesn't support setting a compression 
level.");
       }
-      codec = internal::MakeLz4RawCodec();
+      codec = internal::MakeLz4HadoopRawCodec();

Review comment:
       This is a pain but I think we need to add `Compression::LZ4_HADOOP`

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +350,90 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* 
output_buffer) override {
+    // The following variables only make sense if the parquet file being read 
was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    const uint32_t* input_as_uint32 = reinterpret_cast<const uint32_t*>(input);
+    uint32_t expected_decompressed_size = 
ARROW_BYTE_SWAP32(input_as_uint32[0]);
+    uint32_t expected_compressed_size = ARROW_BYTE_SWAP32(input_as_uint32[1]);
+    int64_t lz4_compressed_buffer_size = input_len - data_byte_offset;
+
+    // We use a heuristic to determine if the parquet file being read
+    // was compressed using the Hadoop Lz4Codec.
+    int64_t decompressed_size;
+    if (lz4_compressed_buffer_size != expected_compressed_size) {
+      // Parquet file was compressed without Hadoop Lz4Codec
+      ARROW_ASSIGN_OR_RAISE(
+          decompressed_size,
+          Lz4Codec::Decompress(input_len, input, output_buffer_len, 
output_buffer));
+    } else {
+      // Parquet file was likely compressed with Hadoop Lz4Codec
+      Result<int64_t> decompressed_size_result =
+          Lz4Codec::Decompress(lz4_compressed_buffer_size, input + 
data_byte_offset,
+                               output_buffer_len, output_buffer);
+
+      if (!decompressed_size_result.ok() ||
+          decompressed_size_result.ValueOrDie() != expected_decompressed_size) 
{
+        // Fall back on regular LZ4-block decompression
+        ARROW_ASSIGN_OR_RAISE(
+            decompressed_size,
+            Lz4Codec::Decompress(input_len, input, output_buffer_len, 
output_buffer));
+      } else {
+        decompressed_size = decompressed_size_result.ValueOrDie();
+      }
+    }
+
+    return decompressed_size;
+  }
+
+  int64_t MaxCompressedLen(int64_t input_len,
+                           const uint8_t* ARROW_ARG_UNUSED(input)) override {
+    return data_byte_offset + Lz4Codec::MaxCompressedLen(input_len, nullptr);
+  }
+
+  Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
+                           int64_t output_buffer_len, uint8_t* output_buffer) 
override {
+    ARROW_ASSIGN_OR_RAISE(int64_t output_len,
+                          Lz4Codec::Compress(input_len, input, 
output_buffer_len,
+                                             output_buffer + 
data_byte_offset));
+
+    // Prepend decompressed size in bytes and compressed size in bytes
+    // to be compatible with Hadoop Lz4Codec
+    ((uint32_t*)output_buffer)[0] = ARROW_BYTE_SWAP32((uint32_t)input_len);
+    ((uint32_t*)output_buffer)[1] = ARROW_BYTE_SWAP32((uint32_t)output_len);
+
+    return data_byte_offset + output_len;
+  }
+
+  Result<std::shared_ptr<Compressor>> MakeCompressor() override {
+    return Status::NotImplemented(
+        "Streaming compression unsupported with LZ4 Hadoop raw format. "
+        "Try using LZ4 frame format instead.");
+  }
+
+  Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
+    return Status::NotImplemented(
+        "Streaming decompression unsupported with LZ4 Hadoop raw format. "
+        "Try using LZ4 frame format instead.");
+  }
+
+  const char* name() const override { return "lz4_hadoop_raw"; }
+
+ protected:
+  // Offset starting at which page data can be read/written
+  static const int64_t data_byte_offset = sizeof(uint32_t) * 2;

Review comment:
       Use `kDataByteOffset` naming style. Perhaps this could be clearer like 
`kPrefixLength` or something

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +350,90 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* 
output_buffer) override {
+    // The following variables only make sense if the parquet file being read 
was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    const uint32_t* input_as_uint32 = reinterpret_cast<const uint32_t*>(input);
+    uint32_t expected_decompressed_size = 
ARROW_BYTE_SWAP32(input_as_uint32[0]);
+    uint32_t expected_compressed_size = ARROW_BYTE_SWAP32(input_as_uint32[1]);

Review comment:
       Should this be using `FromBigEndian` and `SafeLoadAs`? (assuming these 
are big endian java integers) Then you can just use `input` and `input + 
sizeof(uint32_t)` which is clearer anyway
   
   cc @kiszk 

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +350,90 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* 
output_buffer) override {
+    // The following variables only make sense if the parquet file being read 
was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    const uint32_t* input_as_uint32 = reinterpret_cast<const uint32_t*>(input);
+    uint32_t expected_decompressed_size = 
ARROW_BYTE_SWAP32(input_as_uint32[0]);
+    uint32_t expected_compressed_size = ARROW_BYTE_SWAP32(input_as_uint32[1]);
+    int64_t lz4_compressed_buffer_size = input_len - data_byte_offset;
+
+    // We use a heuristic to determine if the parquet file being read
+    // was compressed using the Hadoop Lz4Codec.
+    int64_t decompressed_size;
+    if (lz4_compressed_buffer_size != expected_compressed_size) {
+      // Parquet file was compressed without Hadoop Lz4Codec
+      ARROW_ASSIGN_OR_RAISE(
+          decompressed_size,
+          Lz4Codec::Decompress(input_len, input, output_buffer_len, 
output_buffer));
+    } else {
+      // Parquet file was likely compressed with Hadoop Lz4Codec
+      Result<int64_t> decompressed_size_result =
+          Lz4Codec::Decompress(lz4_compressed_buffer_size, input + 
data_byte_offset,
+                               output_buffer_len, output_buffer);
+
+      if (!decompressed_size_result.ok() ||
+          decompressed_size_result.ValueOrDie() != expected_decompressed_size) 
{
+        // Fall back on regular LZ4-block decompression
+        ARROW_ASSIGN_OR_RAISE(
+            decompressed_size,
+            Lz4Codec::Decompress(input_len, input, output_buffer_len, 
output_buffer));
+      } else {
+        decompressed_size = decompressed_size_result.ValueOrDie();
+      }
+    }
+
+    return decompressed_size;
+  }
+
+  int64_t MaxCompressedLen(int64_t input_len,
+                           const uint8_t* ARROW_ARG_UNUSED(input)) override {
+    return data_byte_offset + Lz4Codec::MaxCompressedLen(input_len, nullptr);
+  }
+
+  Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
+                           int64_t output_buffer_len, uint8_t* output_buffer) 
override {
+    ARROW_ASSIGN_OR_RAISE(int64_t output_len,
+                          Lz4Codec::Compress(input_len, input, 
output_buffer_len,
+                                             output_buffer + 
data_byte_offset));
+
+    // Prepend decompressed size in bytes and compressed size in bytes
+    // to be compatible with Hadoop Lz4Codec
+    ((uint32_t*)output_buffer)[0] = ARROW_BYTE_SWAP32((uint32_t)input_len);
+    ((uint32_t*)output_buffer)[1] = ARROW_BYTE_SWAP32((uint32_t)output_len);

Review comment:
       Must consider big endian platforms and should use memcpy to avoid 
ASAN/UBSAN failures




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to