This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new c839292d8 PARQUET-2444: Define/enforce contract for codecs (#1292)
c839292d8 is described below

commit c839292d8f4545ec9193c688f39305b8e66aa502
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Thu Mar 7 09:33:14 2024 +0100

    PARQUET-2444: Define/enforce contract for codecs (#1292)
    
    * Added javadoc comments to specify the contracts in
      CompressionCodecFactory
    * Updated the related unit test to validate the contract on the
      different implementaions
    * Made LZ4RawCodec to implement direct decompressing so we can also
      cover this scenario in the unit test
    
    Fixed discovered issues based on the new testing:
    * Properly set the ByteBuffer indices for the related implementations
      according to the defined contract
    * Fix issue at DirectCodecFactory that FullDirectDecompressor never
      worked
    * Fix ParquetWriter builder to pass through the codec factory
---
 .../compression/CompressionCodecFactory.java       | 113 ++++++++++++++-
 .../org/apache/parquet/hadoop/CodecFactory.java    |  37 +++--
 .../parquet/hadoop/ColumnChunkPageReadStore.java   |  19 +--
 .../apache/parquet/hadoop/DirectCodecFactory.java  |  96 ++++++-------
 .../org/apache/parquet/hadoop/ParquetWriter.java   |   2 +
 .../apache/parquet/hadoop/codec/Lz4RawCodec.java   |   9 +-
 .../parquet/hadoop/codec/Lz4RawDecompressor.java   |   8 +-
 .../parquet/hadoop/TestDirectCodecFactory.java     | 154 ++++++++++++++-------
 8 files changed, 310 insertions(+), 128 deletions(-)

diff --git 
a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
 
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
index 4f31a3f8a..561dcb899 100644
--- 
a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
+++ 
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
@@ -24,28 +24,137 @@ import java.nio.ByteBuffer;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
+/**
+ * Factory for creating (and potentially caching) {@link BytesInputCompressor} 
and {@link BytesInputDecompressor}
+ * instances to compress/decompress page data.
+ * <p>
+ * The factory instance shall be released after use. The 
compressor/decompressor instances shall not be used after
+ * release.
+ *
+ * @see #release()
+ */
 public interface CompressionCodecFactory {
 
+  /**
+   * Returns a {@link BytesInputCompressor} instance for the specified codec 
name to be used for compressing page data.
+   * <p>
+   * The compressor is not thread-safe, so one instance for each working 
thread is required.
+   *
+   * @param codecName the codec name which the compressor instance is to be 
returned
+   * @return the compressor instance for the specified codec name
+   * @see BytesInputCompressor#release()
+   */
   BytesInputCompressor getCompressor(CompressionCodecName codecName);
 
+  /**
+   * Returns a {@link BytesInputDecompressor} instance for the specified codec 
name to be used for decompressing page
+   * data.
+   * <p>
+   * The decompressor is not thread-safe, so one instance for each working 
thread is required.
+   *
+   * @param codecName the codec name which the decompressor instance is to be 
returned
+   * @return the decompressor instance for the specified codec name
+   * @see BytesInputDecompressor#release()
+   */
   BytesInputDecompressor getDecompressor(CompressionCodecName codecName);
 
+  /**
+   * Releasing this factory instance.
+   * <p>
+   * Each compressor/decompressor instance shall be released before invoking 
this. Nor the compressor/decompressor
+   * instances retrieved from this factory nor this factory instance itself 
shall be used after release.
+   *
+   * @see BytesInputCompressor#release()
+   * @see BytesInputDecompressor#release()
+   */
   void release();
 
+  /**
+   * Compressor instance of a specific codec to be used for compressing page 
data.
+   * <p>
+   * This compressor shall be released after use. This compressor shall not be 
used after release.
+   *
+   * @see #release()
+   */
   interface BytesInputCompressor {
+
+    /**
+     * Compresses the specified {@link BytesInput} data and returns it as 
{@link BytesInput}.
+     * <p>
+     * Depending on the implementation {@code bytes} might be completely 
consumed. The returned {@link BytesInput}
+     * instance needs to be consumed before using this compressor again. This 
is because the implementation might use
+     * its internal buffer to directly provide the returned {@link BytesInput} 
instance.
+     *
+     * @param bytes the page data to be compressed
+     * @return a {@link BytesInput} containing the compressed data. Needs to 
be consumed before using this compressor
+     * again.
+     * @throws IOException if any I/O error occurs during the compression
+     */
     BytesInput compress(BytesInput bytes) throws IOException;
 
+    /**
+     * Returns the codec name of this compressor.
+     *
+     * @return the codec name
+     */
     CompressionCodecName getCodecName();
 
+    /**
+     * Releases this compressor instance.
+     * <p>
+     * No subsequent calls on this instance nor the returned {@link 
BytesInput} instance returned by
+     * {@link #compress(BytesInput)} shall be used after release.
+     */
     void release();
   }
 
+  /**
+   * Decompressor instance of a specific codec to be used for decompressing 
page data.
+   * <p>
+   * This decompressor shall be released after use. This decompressor shall 
not be used after release.
+   *
+   * @see #release()
+   */
   interface BytesInputDecompressor {
-    BytesInput decompress(BytesInput bytes, int uncompressedSize) throws 
IOException;
 
-    void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, 
int uncompressedSize)
+    /**
+     * Decompresses the specified {@link BytesInput} data and returns it as 
{@link BytesInput}.
+     * <p>
+     * The decompressed data must have the size specified. Depending on the 
implementation {@code bytes} might be
+     * completely consumed. The returned {@link BytesInput} instance needs to 
be consumed before using this decompressor
+     * again. This is because the implementation might use its internal buffer 
to directly provide the returned
+     * {@link BytesInput} instance.
+     *
+     * @param bytes            the page data to be decompressed
+     * @param decompressedSize the exact size of the decompressed data
+     * @return a {@link BytesInput} containing the decompressed data. Needs to 
be consumed before using this
+     * decompressor again.
+     * @throws IOException if any I/O error occurs during the decompression
+     */
+    BytesInput decompress(BytesInput bytes, int decompressedSize) throws 
IOException;
+
+    /**
+     * Decompresses {@code compressedSize} bytes from {@code input} from the 
current position. The decompressed bytes is
+     * to be written int {@code output} from its current position. The 
decompressed data must have the size specified.
+     * <p>
+     * {@code output} must have the available bytes of {@code 
decompressedSize}. According to the {@link ByteBuffer}
+     * contract the position of {@code input} will be increased by {@code 
compressedSize}, and the position of
+     * {@code output} will be increased by {@code decompressedSize}. (It 
means, one would have to flip the output buffer
+     * before reading the decompressed data from it.)
+     *
+     * @param input            the input buffer where the data is to be 
decompressed from
+     * @param compressedSize   the exact size of the compressed (input) data
+     * @param output           the output buffer where the data is to be 
decompressed into
+     * @param decompressedSize the exact size of the decompressed (output) data
+     * @throws IOException if any I/O error occurs during the decompression
+     */
+    void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, 
int decompressedSize)
         throws IOException;
 
+    /**
+     * Releases this decompressor instance. No subsequent calls on this 
instance nor the returned {@link BytesInput}
+     * instance returned by {@link #decompress(BytesInput, int)} shall be used 
after release.
+     */
     void release();
   }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 98c7002fe..f0775484c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -56,16 +56,20 @@ public class CodecFactory implements 
CompressionCodecFactory {
 
   static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize) {
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize) {
       Preconditions.checkArgument(
-          compressedSize == uncompressedSize,
-          "Non-compressed data did not have matching compressed and 
uncompressed sizes.");
-      output.clear();
-      output.put((ByteBuffer) 
input.duplicate().position(0).limit(compressedSize));
+          compressedSize == decompressedSize,
+          "Non-compressed data did not have matching compressed and 
decompressed sizes.");
+      Preconditions.checkArgument(
+          input.remaining() >= compressedSize, "Not enough bytes available in 
the input buffer");
+      int origLimit = input.limit();
+      input.limit(input.position() + compressedSize);
+      output.put(input);
+      input.limit(origLimit);
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
+    public BytesInput decompress(BytesInput bytes, int decompressedSize) {
       return bytes;
     }
 
@@ -150,7 +154,7 @@ public class CodecFactory implements 
CompressionCodecFactory {
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
+    public BytesInput decompress(BytesInput bytes, int decompressedSize) 
throws IOException {
       final BytesInput decompressed;
       if (decompressor != null) {
         decompressor.reset();
@@ -162,20 +166,27 @@ public class CodecFactory implements 
CompressionCodecFactory {
       // This change will load the decompressor stream into heap a little 
earlier, since the problem it solves
       // only happens in the ZSTD codec, so this modification is only made for 
ZSTD streams.
       if (codec instanceof ZstandardCodec) {
-        decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+        decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
         is.close();
       } else {
-        decompressed = BytesInput.from(is, uncompressedSize);
+        decompressed = BytesInput.from(is, decompressedSize);
       }
       return decompressed;
     }
 
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize)
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize)
         throws IOException {
+      Preconditions.checkArgument(
+          input.remaining() >= compressedSize, "Not enough bytes available in 
the input buffer");
+      int origLimit = input.limit();
+      int origPosition = input.position();
+      input.limit(origPosition + compressedSize);
       ByteBuffer decompressed =
-          decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
+          decompress(BytesInput.from(input), decompressedSize).toByteBuffer();
       output.put(decompressed);
+      input.limit(origLimit);
+      input.position(origPosition + compressedSize);
     }
 
     public void release() {
@@ -338,9 +349,9 @@ public class CodecFactory implements 
CompressionCodecFactory {
    */
   @Deprecated
   public abstract static class BytesDecompressor implements 
CompressionCodecFactory.BytesInputDecompressor {
-    public abstract BytesInput decompress(BytesInput bytes, int 
uncompressedSize) throws IOException;
+    public abstract BytesInput decompress(BytesInput bytes, int 
decompressedSize) throws IOException;
 
-    public abstract void decompress(ByteBuffer input, int compressedSize, 
ByteBuffer output, int uncompressedSize)
+    public abstract void decompress(ByteBuffer input, int compressedSize, 
ByteBuffer output, int decompressedSize)
         throws IOException;
 
     public abstract void release();
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index cc20c1562..c7fc22b29 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -168,12 +168,7 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                   decompressedBuffer,
                   dataPageV1.getUncompressedSize());
               setDecompressMetrics(bytes, start);
-
-              // HACKY: sometimes we need to do `flip` because the position of 
output bytebuffer is
-              // not reset.
-              if (decompressedBuffer.position() != 0) {
-                decompressedBuffer.flip();
-              }
+              decompressedBuffer.flip();
               decompressed = BytesInput.from(decompressedBuffer);
             } else { // use on-heap buffer
               if (null != blockDecryptor) {
@@ -225,9 +220,6 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
           }
           BytesInput pageBytes = dataPageV2.getData();
           try {
-            BytesInput decompressed;
-            long compressedSize;
-
             if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
               ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser);
               if (!byteBuffer.isDirect()) {
@@ -236,7 +228,7 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
               if (blockDecryptor != null) {
                 byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
               }
-              compressedSize = byteBuffer.limit();
+              long compressedSize = byteBuffer.limit();
               if (dataPageV2.isCompressed()) {
                 int uncompressedSize = 
Math.toIntExact(dataPageV2.getUncompressedSize()
                     - dataPageV2.getDefinitionLevels().size()
@@ -248,12 +240,7 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                 decompressor.decompress(
                     byteBuffer, (int) compressedSize, decompressedBuffer, 
uncompressedSize);
                 setDecompressMetrics(pageBytes, start);
-
-                // HACKY: sometimes we need to do `flip` because the position 
of output bytebuffer is
-                // not reset.
-                if (decompressedBuffer.position() != 0) {
-                  decompressedBuffer.flip();
-                }
+                decompressedBuffer.flip();
                 pageBytes = BytesInput.from(decompressedBuffer);
               } else {
                 pageBytes = BytesInput.from(byteBuffer);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index f509dce55..523e57dbf 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -68,7 +68,8 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     try {
       tempClass = 
Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec");
       tempCreateMethod = tempClass.getMethod("createDirectDecompressor");
-      tempDecompressMethod = tempClass.getMethod("decompress", 
ByteBuffer.class, ByteBuffer.class);
+      Class<?> tempClass2 = 
Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
+      tempDecompressMethod = tempClass2.getMethod("decompress", 
ByteBuffer.class, ByteBuffer.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       // do nothing, the class will just be assigned null
     }
@@ -150,27 +151,25 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
+    public BytesInput decompress(BytesInput bytes, int decompressedSize) 
throws IOException {
       decompressor.reset();
       byte[] inputBytes = bytes.toByteArray();
       decompressor.setInput(inputBytes, 0, inputBytes.length);
-      byte[] output = new byte[uncompressedSize];
-      decompressor.decompress(output, 0, uncompressedSize);
+      byte[] output = new byte[decompressedSize];
+      decompressor.decompress(output, 0, decompressedSize);
       return BytesInput.from(output);
     }
 
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize)
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize)
         throws IOException {
 
       decompressor.reset();
       byte[] inputBytes = new byte[compressedSize];
-      input.position(0);
       input.get(inputBytes);
       decompressor.setInput(inputBytes, 0, inputBytes.length);
-      byte[] outputBytes = new byte[uncompressedSize];
-      decompressor.decompress(outputBytes, 0, uncompressedSize);
-      output.clear();
+      byte[] outputBytes = new byte[decompressedSize];
+      decompressor.decompress(outputBytes, 0, decompressedSize);
       output.put(outputBytes);
     }
 
@@ -193,14 +192,14 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
+    public BytesInput decompress(BytesInput bytes, int decompressedSize) 
throws IOException {
       try (ByteBufferReleaser releaser = inputAllocator.getReleaser()) {
         ByteBuffer input = bytes.toByteBuffer(releaser);
-        ByteBuffer output = outputAllocator.allocate(uncompressedSize);
+        ByteBuffer output = outputAllocator.allocate(decompressedSize);
         int size = decompress(input.slice(), output.slice());
-        if (size != uncompressedSize) {
+        if (size != decompressedSize) {
           throw new DirectCodecPool.ParquetCompressionCodecException(
-              "Unexpected decompressed size: " + size + " != " + 
uncompressedSize);
+              "Unexpected decompressed size: " + size + " != " + 
decompressedSize);
         }
         output.limit(size);
         return BytesInput.from(output);
@@ -210,26 +209,26 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     abstract int decompress(ByteBuffer input, ByteBuffer output) throws 
IOException;
 
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize)
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize)
         throws IOException {
+      int origInputLimit = input.limit();
       input.limit(input.position() + compressedSize);
-      output.limit(output.position() + uncompressedSize);
+      int origOutputLimit = output.limit();
+      output.limit(output.position() + decompressedSize);
       int size = decompress(input.slice(), output.slice());
-      if (size != uncompressedSize) {
+      if (size != decompressedSize) {
         throw new DirectCodecPool.ParquetCompressionCodecException(
-            "Unexpected decompressed size: " + size + " != " + 
uncompressedSize);
+            "Unexpected decompressed size: " + size + " != " + 
decompressedSize);
       }
       input.position(input.limit());
-      output.position(output.position() + uncompressedSize);
+      input.limit(origInputLimit);
+      output.position(output.limit());
+      output.limit(origOutputLimit);
     }
 
     @Override
     public void release() {
-      try {
-        AutoCloseables.uncheckedClose(outputAllocator, inputAllocator);
-      } finally {
-        closeDecompressor();
-      }
+      AutoCloseables.uncheckedClose(outputAllocator, inputAllocator, 
this::closeDecompressor);
     }
 
     abstract void closeDecompressor();
@@ -264,11 +263,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
 
     @Override
     public void release() {
-      try {
-        AutoCloseables.uncheckedClose(outputAllocator, inputAllocator);
-      } finally {
-        closeCompressor();
-      }
+      AutoCloseables.uncheckedClose(outputAllocator, inputAllocator, 
this::closeCompressor);
     }
 
     abstract void closeCompressor();
@@ -296,22 +291,22 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
 
     @Override
-    public BytesInput decompress(BytesInput compressedBytes, int 
uncompressedSize) throws IOException {
+    public BytesInput decompress(BytesInput compressedBytes, int 
decompressedSize) throws IOException {
       // Similarly to non-direct decompressors, we reset before use, if 
possible (see HeapBytesDecompressor)
       if (decompressor instanceof Decompressor) {
         ((Decompressor) decompressor).reset();
       }
-      return super.decompress(compressedBytes, uncompressedSize);
+      return super.decompress(compressedBytes, decompressedSize);
     }
 
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize)
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize)
         throws IOException {
       // Similarly to non-direct decompressors, we reset before use, if 
possible (see HeapBytesDecompressor)
       if (decompressor instanceof Decompressor) {
         ((Decompressor) decompressor).reset();
       }
-      super.decompress(input, compressedSize, output, uncompressedSize);
+      super.decompress(input, compressedSize, output, decompressedSize);
     }
 
     @Override
@@ -322,7 +317,10 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
       } catch (IllegalAccessException | InvocationTargetException e) {
         throw new DirectCodecPool.ParquetCompressionCodecException(e);
       }
-      return output.position() - startPos;
+      int size = output.position() - startPos;
+      // Some decompressors flip the output buffer, some don't:
+      // Let's rely on the limit if the position did not change
+      return size == 0 ? output.limit() : size;
     }
 
     @Override
@@ -338,22 +336,20 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
   public class NoopDecompressor extends BytesDecompressor {
 
     @Override
-    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize)
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int decompressedSize)
         throws IOException {
-      Preconditions.checkArgument(
-          compressedSize == uncompressedSize,
-          "Non-compressed data did not have matching compressed and 
uncompressed sizes.");
-      output.clear();
-      output.put((ByteBuffer) 
input.duplicate().position(0).limit(compressedSize));
+      NO_OP_DECOMPRESSOR.decompress(input, compressedSize, output, 
decompressedSize);
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
-      return bytes;
+    public BytesInput decompress(BytesInput bytes, int decompressedSize) 
throws IOException {
+      return NO_OP_DECOMPRESSOR.decompress(bytes, decompressedSize);
     }
 
     @Override
-    public void release() {}
+    public void release() {
+      NO_OP_DECOMPRESSOR.release();
+    }
   }
 
   public class SnappyDecompressor extends BaseDecompressor {
@@ -416,6 +412,8 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
       context = new ZstdCompressCtx();
       context.setLevel(configuration.getInt(
           ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, 
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL));
+      context.setWorkers(configuration.getInt(
+          ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, 
ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
     }
 
     @Override
@@ -449,16 +447,18 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
 
     @Override
     public BytesInput compress(BytesInput bytes) throws IOException {
-      return bytes;
+      return NO_OP_COMPRESSOR.compress(bytes);
     }
 
     @Override
     public CompressionCodecName getCodecName() {
-      return CompressionCodecName.UNCOMPRESSED;
+      return NO_OP_COMPRESSOR.getCodecName();
     }
 
     @Override
-    public void release() {}
+    public void release() {
+      NO_OP_COMPRESSOR.release();
+    }
   }
 
   static class DirectCodecPool {
@@ -486,7 +486,8 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
 
       private CodecPool(final CompressionCodec codec) {
         try {
-          boolean supportDirectDecompressor = codec.getClass() == 
DIRECT_DECOMPRESSION_CODEC_CLASS;
+          boolean supportDirectDecompressor = DIRECT_DECOMPRESSION_CODEC_CLASS 
!= null
+              && 
DIRECT_DECOMPRESSION_CODEC_CLASS.isAssignableFrom(codec.getClass());
           compressorPool = new GenericObjectPool(
               new BasePoolableObjectFactory() {
                 public Object makeObject() throws Exception {
@@ -533,8 +534,7 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
             directDecompressorPool = new GenericObjectPool(
                 new BasePoolableObjectFactory() {
                   public Object makeObject() throws Exception {
-                    return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(
-                        DIRECT_DECOMPRESSION_CODEC_CLASS);
+                    return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(codec);
                   }
                 },
                 Integer.MAX_VALUE);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 22dc7e30f..a76b843e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -916,6 +916,7 @@ public class ParquetWriter<T> implements Closeable {
             mode,
             getWriteSupport(conf),
             codecName,
+            codecFactory,
             rowGroupSize,
             enableValidation,
             conf,
@@ -928,6 +929,7 @@ public class ParquetWriter<T> implements Closeable {
             mode,
             getWriteSupport(conf),
             codecName,
+            codecFactory,
             rowGroupSize,
             enableValidation,
             conf,
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
index fd8c1a81e..6a25c5982 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 
 /**
  * Lz4 raw compression codec for Parquet. This codec type has been introduced
@@ -39,7 +41,7 @@ import org.apache.hadoop.io.compress.Decompressor;
  * below for reference.
  * https://github.com/apache/parquet-format/blob/master/Compression.md
  */
-public class Lz4RawCodec implements Configurable, CompressionCodec {
+public class Lz4RawCodec implements Configurable, CompressionCodec, 
DirectDecompressionCodec {
 
   private Configuration conf;
 
@@ -68,6 +70,11 @@ public class Lz4RawCodec implements Configurable, 
CompressionCodec {
     return new Lz4RawDecompressor();
   }
 
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return new Lz4RawDecompressor();
+  }
+
   @Override
   public CompressionInputStream createInputStream(InputStream stream) throws 
IOException {
     return createInputStream(stream, createDecompressor());
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
index 7477bda87..68839d281 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
@@ -21,8 +21,9 @@ package org.apache.parquet.hadoop.codec;
 import io.airlift.compress.lz4.Lz4Decompressor;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 
-public class Lz4RawDecompressor extends NonBlockedDecompressor {
+public class Lz4RawDecompressor extends NonBlockedDecompressor implements 
DirectDecompressor {
 
   private Lz4Decompressor decompressor = new Lz4Decompressor();
 
@@ -41,4 +42,9 @@ public class Lz4RawDecompressor extends 
NonBlockedDecompressor {
     uncompressed.rewind();
     return uncompressedSize;
   }
+
+  @Override
+  public void decompress(ByteBuffer compressed, ByteBuffer uncompressed) 
throws IOException {
+    uncompress(compressed, uncompressed);
+  }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
index 3a1779588..c78ee09ec 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -19,14 +19,17 @@ package org.apache.parquet.hadoop;
 
 import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI;
 import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW;
 import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.ByteBufferReleaser;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -55,13 +58,12 @@ public class TestDirectCodecFactory {
   private void test(int size, CompressionCodecName codec, boolean 
useOnHeapCompression, Decompression decomp) {
     try (TrackingByteBufferAllocator allocator = 
TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
         ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
-      final CodecFactory codecFactory =
+      final CodecFactory directCodecFactory =
           CodecFactory.createDirectCodecFactory(new Configuration(), 
allocator, pageSize);
+      final CodecFactory heapCodecFactory = new CodecFactory(new 
Configuration(), pageSize);
       ByteBuffer rawBuf = allocator.allocate(size);
       releaser.releaseLater(rawBuf);
       final byte[] rawArr = new byte[size];
-      ByteBuffer outBuf = allocator.allocate(size * 2);
-      releaser.releaseLater(outBuf);
       final Random r = new Random();
       final byte[] random = new byte[1024];
       int pos = 0;
@@ -73,56 +75,51 @@ public class TestDirectCodecFactory {
       }
       rawBuf.flip();
 
-      final BytesInputCompressor c = codecFactory.getCompressor(codec);
-      final BytesInputDecompressor d = codecFactory.getDecompressor(codec);
+      final BytesInputCompressor directCompressor = 
directCodecFactory.getCompressor(codec);
+      final BytesInputDecompressor directDecompressor = 
directCodecFactory.getDecompressor(codec);
+      final BytesInputCompressor heapCompressor = 
heapCodecFactory.getCompressor(codec);
+      final BytesInputDecompressor heapDecompressor = 
heapCodecFactory.getDecompressor(codec);
 
-      final BytesInput compressed;
+      if (codec == LZ4_RAW) {
+        // Hadoop codecs support direct decompressors only if the related 
native libraries are available.
+        // This is not the case for our CI so let's rely on LZ4_RAW where the 
implementation is our own.
+        Assert.assertTrue(
+            String.format("The hadoop codec %s should support direct 
decompression", codec),
+            directDecompressor instanceof 
DirectCodecFactory.FullDirectDecompressor);
+      }
+
+      final BytesInput directCompressed;
       if (useOnHeapCompression) {
-        compressed = c.compress(BytesInput.from(rawArr));
+        directCompressed = directCompressor.compress(BytesInput.from(rawArr));
       } else {
-        compressed = c.compress(BytesInput.from(rawBuf));
+        directCompressed = directCompressor.compress(BytesInput.from(rawBuf));
       }
 
-      switch (decomp) {
-        case OFF_HEAP: {
-          final ByteBuffer buf = compressed.toByteBuffer();
-          final ByteBuffer b = allocator.allocate(buf.capacity());
-          try {
-            b.put(buf);
-            b.flip();
-            d.decompress(b, (int) compressed.size(), outBuf, size);
-            for (int i = 0; i < size; i++) {
-              Assert.assertTrue("Data didn't match at " + i, outBuf.get(i) == 
rawBuf.get(i));
-            }
-          } finally {
-            allocator.release(b);
-          }
-          break;
-        }
+      BytesInput heapCompressed = 
heapCompressor.compress(BytesInput.from(rawArr));
 
-        case OFF_HEAP_BYTES_INPUT: {
-          final ByteBuffer buf = compressed.toByteBuffer();
-          final ByteBuffer b = allocator.allocate(buf.limit());
-          try {
-            b.put(buf);
-            b.flip();
-            final BytesInput input = d.decompress(BytesInput.from(b), size);
-            Assert.assertArrayEquals(
-                String.format("While testing codec %s", codec), 
input.toByteArray(), rawArr);
-          } finally {
-            allocator.release(b);
-          }
-          break;
-        }
-        case ON_HEAP: {
-          final byte[] buf = compressed.toByteArray();
-          final BytesInput input = d.decompress(BytesInput.from(buf), size);
-          Assert.assertArrayEquals(input.toByteArray(), rawArr);
-          break;
-        }
-      }
-      c.release();
-      d.release();
+      // Validate direct => direct
+      validateDecompress(
+          size,
+          codec,
+          decomp,
+          directCompressed.copy(releaser),
+          allocator,
+          directDecompressor,
+          rawBuf,
+          rawArr);
+
+      // Validate heap => direct
+      validateDecompress(size, codec, decomp, heapCompressed, allocator, 
directDecompressor, rawBuf, rawArr);
+
+      // Validate direct => heap
+      validateDecompress(size, codec, decomp, directCompressed, allocator, 
heapDecompressor, rawBuf, rawArr);
+
+      directCompressor.release();
+      directDecompressor.release();
+      directCodecFactory.release();
+      heapCompressor.release();
+      heapDecompressor.release();
+      heapCodecFactory.release();
     } catch (Exception e) {
       final String msg = String.format(
           "Failure while testing Codec: %s, OnHeapCompressionInput: %s, 
Decompression Mode: %s, Data Size: %d",
@@ -132,6 +129,69 @@ public class TestDirectCodecFactory {
     }
   }
 
+  private static void validateDecompress(
+      int size,
+      CompressionCodecName codec,
+      Decompression decomp,
+      BytesInput compressed,
+      ByteBufferAllocator allocator,
+      BytesInputDecompressor d,
+      ByteBuffer rawBuf,
+      byte[] rawArr)
+      throws IOException {
+    switch (decomp) {
+      case OFF_HEAP: {
+        final ByteBuffer buf = compressed.toByteBuffer();
+        final ByteBuffer b = allocator.allocate(buf.capacity() + 20);
+        final ByteBuffer outBuf = allocator.allocate(size + 20);
+        final int shift = 10;
+        try {
+          b.position(shift);
+          b.put(buf);
+          b.position(shift);
+          outBuf.position(shift);
+          d.decompress(b, (int) compressed.size(), outBuf, size);
+          Assert.assertEquals(
+              "Input buffer position mismatch for codec " + codec,
+              compressed.size() + shift,
+              b.position());
+          Assert.assertEquals(
+              "Output buffer position mismatch for codec " + codec, size + 
shift, outBuf.position());
+          for (int i = 0; i < size; i++) {
+            Assert.assertTrue(
+                String.format("Data didn't match at %d, while testing codec 
%s", i, codec),
+                outBuf.get(shift + i) == rawBuf.get(i));
+          }
+        } finally {
+          allocator.release(b);
+          allocator.release(outBuf);
+        }
+        break;
+      }
+
+      case OFF_HEAP_BYTES_INPUT: {
+        final ByteBuffer buf = compressed.toByteBuffer();
+        final ByteBuffer b = allocator.allocate(buf.limit());
+        try {
+          b.put(buf);
+          b.flip();
+          final BytesInput input = d.decompress(BytesInput.from(b), size);
+          Assert.assertArrayEquals(
+              String.format("While testing codec %s", codec), 
input.toByteArray(), rawArr);
+        } finally {
+          allocator.release(b);
+        }
+        break;
+      }
+      case ON_HEAP: {
+        final byte[] buf = compressed.toByteArray();
+        final BytesInput input = d.decompress(BytesInput.from(buf), size);
+        Assert.assertArrayEquals(String.format("While testing codec %s", 
codec), input.toByteArray(), rawArr);
+        break;
+      }
+    }
+  }
+
   @Test
   public void createDirectFactoryWithHeapAllocatorFails() {
     String errorMsg =


Reply via email to