This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d53f98eb PARQUET-2440: Avoid getting Hadoop codec for internal 
compressor/decompressor (#1286)
4d53f98eb is described below

commit 4d53f98eb479794408ad04856b3424bbacfba45a
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Mon Mar 4 09:27:26 2024 +0100

    PARQUET-2440: Avoid getting Hadoop codec for internal 
compressor/decompressor (#1286)
---
 .../org/apache/parquet/hadoop/CodecFactory.java    | 120 ++++++++++++---------
 .../apache/parquet/hadoop/DirectCodecFactory.java  |  73 ++++++++-----
 2 files changed, 115 insertions(+), 78 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 9df1f7e5e..98c7002fe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -32,6 +33,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.compression.CompressionCodecFactory;
@@ -52,6 +54,40 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected final ParquetConfiguration configuration;
   protected final int pageSize;
 
+  static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
+    @Override
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize) {
+      Preconditions.checkArgument(
+          compressedSize == uncompressedSize,
+          "Non-compressed data did not have matching compressed and 
uncompressed sizes.");
+      output.clear();
+      output.put((ByteBuffer) 
input.duplicate().position(0).limit(compressedSize));
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
+      return bytes;
+    }
+
+    @Override
+    public void release() {}
+  };
+
+  static final BytesCompressor NO_OP_COMPRESSOR = new BytesCompressor() {
+    @Override
+    public BytesInput compress(BytesInput bytes) {
+      return bytes;
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.UNCOMPRESSED;
+    }
+
+    @Override
+    public void release() {}
+  };
+
   /**
    * Create a new codec factory.
    *
@@ -108,37 +144,28 @@ public class CodecFactory implements 
CompressionCodecFactory {
     private final CompressionCodec codec;
     private final Decompressor decompressor;
 
-    HeapBytesDecompressor(CompressionCodecName codecName) {
-      this.codec = getCodec(codecName);
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
+    HeapBytesDecompressor(CompressionCodec codec) {
+      this.codec = Objects.requireNonNull(codec);
+      decompressor = CodecPool.getDecompressor(codec);
     }
 
     @Override
     public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
       final BytesInput decompressed;
-      if (codec != null) {
-        if (decompressor != null) {
-          decompressor.reset();
-        }
-        InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
-
-        // We need to explicitly close the ZstdDecompressorStream here to 
release the resources it holds to
-        // avoid
-        // off-heap memory fragmentation issue, see 
https://issues.apache.org/jira/browse/PARQUET-2160.
-        // This change will load the decompressor stream into heap a little 
earlier, since the problem it solves
-        // only happens in the ZSTD codec, so this modification is only made 
for ZSTD streams.
-        if (codec instanceof ZstandardCodec) {
-          decompressed = BytesInput.copy(BytesInput.from(is, 
uncompressedSize));
-          is.close();
-        } else {
-          decompressed = BytesInput.from(is, uncompressedSize);
-        }
+      if (decompressor != null) {
+        decompressor.reset();
+      }
+      InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
+
+      // We need to explicitly close the ZstdDecompressorStream here to 
release the resources it holds to
+      // avoid off-heap memory fragmentation issue, see 
https://issues.apache.org/jira/browse/PARQUET-2160.
+      // This change will load the decompressor stream into heap a little 
earlier, since the problem it solves
+      // only happens in the ZSTD codec, so this modification is only made for 
ZSTD streams.
+      if (codec instanceof ZstandardCodec) {
+        decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+        is.close();
       } else {
-        decompressed = bytes;
+        decompressed = BytesInput.from(is, uncompressedSize);
       }
       return decompressed;
     }
@@ -168,36 +195,25 @@ public class CodecFactory implements 
CompressionCodecFactory {
     private final ByteArrayOutputStream compressedOutBuffer;
     private final CompressionCodecName codecName;
 
-    HeapBytesCompressor(CompressionCodecName codecName) {
+    HeapBytesCompressor(CompressionCodecName codecName, CompressionCodec 
codec) {
       this.codecName = codecName;
-      this.codec = getCodec(codecName);
-      if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
-        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
-      } else {
-        this.compressor = null;
-        this.compressedOutBuffer = null;
-      }
+      this.codec = Objects.requireNonNull(codec);
+      this.compressor = CodecPool.getCompressor(codec);
+      this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
     }
 
     @Override
     public BytesInput compress(BytesInput bytes) throws IOException {
-      final BytesInput compressedBytes;
-      if (codec == null) {
-        compressedBytes = bytes;
-      } else {
-        compressedOutBuffer.reset();
-        if (compressor != null) {
-          // null compressor for non-native gzip
-          compressor.reset();
-        }
-        try (CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, compressor)) {
-          bytes.writeAllTo(cos);
-          cos.finish();
-        }
-        compressedBytes = BytesInput.from(compressedOutBuffer);
+      compressedOutBuffer.reset();
+      if (compressor != null) {
+        // null compressor for non-native gzip
+        compressor.reset();
+      }
+      try (CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, compressor)) {
+        bytes.writeAllTo(cos);
+        cos.finish();
       }
-      return compressedBytes;
+      return BytesInput.from(compressedOutBuffer);
     }
 
     @Override
@@ -233,11 +249,13 @@ public class CodecFactory implements 
CompressionCodecFactory {
   }
 
   protected BytesCompressor createCompressor(CompressionCodecName codecName) {
-    return new HeapBytesCompressor(codecName);
+    CompressionCodec codec = getCodec(codecName);
+    return codec == null ? NO_OP_COMPRESSOR : new 
HeapBytesCompressor(codecName, codec);
   }
 
   protected BytesDecompressor createDecompressor(CompressionCodecName 
codecName) {
-    return new HeapBytesDecompressor(codecName);
+    CompressionCodec codec = getCodec(codecName);
+    return codec == null ? NO_OP_DECOMPRESSOR : new 
HeapBytesDecompressor(codec);
   }
 
   /**
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index 3e2ad10b1..f509dce55 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -96,35 +96,37 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
 
   @Override
   protected BytesCompressor createCompressor(final CompressionCodecName 
codecName) {
-
-    CompressionCodec codec = getCodec(codecName);
-    if (codec == null) {
-      return new NoopCompressor();
-    } else if (codecName == CompressionCodecName.SNAPPY) {
-      // avoid using the default Snappy codec since it allocates direct 
buffers at awkward spots.
-      return new SnappyCompressor();
-    } else if (codecName == CompressionCodecName.ZSTD) {
-      return new ZstdCompressor();
-    } else {
-      // todo: create class similar to the SnappyCompressor for zlib and 
exclude it as
-      // snappy is above since it also generates allocateDirect calls.
-      return new HeapBytesCompressor(codecName);
+    switch (codecName) {
+      case SNAPPY:
+        // avoid using the default Snappy codec since it allocates direct 
buffers at awkward spots.
+        return new SnappyCompressor();
+      case ZSTD:
+        return new ZstdCompressor();
+        // todo: create class similar to the SnappyCompressor for zlib and 
exclude it as
+        // snappy is above since it also generates allocateDirect calls.
+      default:
+        return super.createCompressor(codecName);
     }
   }
 
   @Override
   protected BytesDecompressor createDecompressor(final CompressionCodecName 
codecName) {
-    CompressionCodec codec = getCodec(codecName);
-    if (codec == null) {
-      return new NoopDecompressor();
-    } else if (codecName == CompressionCodecName.SNAPPY) {
-      return new SnappyDecompressor();
-    } else if (codecName == CompressionCodecName.ZSTD) {
-      return new ZstdDecompressor();
-    } else if 
(DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
-      return new FullDirectDecompressor(codecName);
-    } else {
-      return new IndirectDecompressor(codec);
+    switch (codecName) {
+      case SNAPPY:
+        return new SnappyDecompressor();
+      case ZSTD:
+        return new ZstdDecompressor();
+      default:
+        CompressionCodec codec = getCodec(codecName);
+        if (codec == null) {
+          return NO_OP_DECOMPRESSOR;
+        }
+        DirectCodecPool.CodecPool pool = DirectCodecPool.INSTANCE.codec(codec);
+        if (pool.supportsDirectDecompression()) {
+          return new FullDirectDecompressor(pool.borrowDirectDecompressor());
+        } else {
+          return new IndirectDecompressor(pool.borrowDecompressor());
+        }
     }
   }
 
@@ -140,7 +142,11 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     private final Decompressor decompressor;
 
     public IndirectDecompressor(CompressionCodec codec) {
-      this.decompressor = 
DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+      this(DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor());
+    }
+
+    private IndirectDecompressor(Decompressor decompressor) {
+      this.decompressor = decompressor;
     }
 
     @Override
@@ -280,8 +286,13 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     private final Object decompressor;
 
     public FullDirectDecompressor(CompressionCodecName codecName) {
-      CompressionCodec codec = getCodec(codecName);
-      this.decompressor = 
DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+      this(DirectCodecPool.INSTANCE
+          .codec(Objects.requireNonNull(getCodec(codecName)))
+          .borrowDirectDecompressor());
+    }
+
+    private FullDirectDecompressor(Object decompressor) {
+      this.decompressor = decompressor;
     }
 
     @Override
@@ -320,6 +331,10 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
   }
 
+  /**
+   * @deprecated Use {@link CodecFactory#NO_OP_DECOMPRESSOR} instead
+   */
+  @Deprecated
   public class NoopDecompressor extends BytesDecompressor {
 
     @Override
@@ -424,6 +439,10 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
     }
   }
 
+  /**
+   * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead
+   */
+  @Deprecated
   public static class NoopCompressor extends BytesCompressor {
 
     public NoopCompressor() {}

Reply via email to