This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c839292d8 PARQUET-2444: Define/enforce contract for codecs (#1292)
c839292d8 is described below
commit c839292d8f4545ec9193c688f39305b8e66aa502
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Thu Mar 7 09:33:14 2024 +0100
PARQUET-2444: Define/enforce contract for codecs (#1292)
* Added javadoc comments to specify the contracts in
CompressionCodecFactory
* Updated the related unit test to validate the contract on the
different implementaions
* Made LZ4RawCodec to implement direct decompressing so we can also
cover this scenario in the unit test
Fixed discovered issues based on the new testing:
* Properly set the ByteBuffer indices for the related implementations
according to the defined contract
* Fix issue at DirectCodecFactory that FullDirectDecompressor never
worked
* Fix ParquetWriter builder to pass through the codec factory
---
.../compression/CompressionCodecFactory.java | 113 ++++++++++++++-
.../org/apache/parquet/hadoop/CodecFactory.java | 37 +++--
.../parquet/hadoop/ColumnChunkPageReadStore.java | 19 +--
.../apache/parquet/hadoop/DirectCodecFactory.java | 96 ++++++-------
.../org/apache/parquet/hadoop/ParquetWriter.java | 2 +
.../apache/parquet/hadoop/codec/Lz4RawCodec.java | 9 +-
.../parquet/hadoop/codec/Lz4RawDecompressor.java | 8 +-
.../parquet/hadoop/TestDirectCodecFactory.java | 154 ++++++++++++++-------
8 files changed, 310 insertions(+), 128 deletions(-)
diff --git
a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
index 4f31a3f8a..561dcb899 100644
---
a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
+++
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
@@ -24,28 +24,137 @@ import java.nio.ByteBuffer;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+/**
+ * Factory for creating (and potentially caching) {@link BytesInputCompressor}
and {@link BytesInputDecompressor}
+ * instances to compress/decompress page data.
+ * <p>
+ * The factory instance shall be released after use. The
compressor/decompressor instances shall not be used after
+ * release.
+ *
+ * @see #release()
+ */
public interface CompressionCodecFactory {
+ /**
+ * Returns a {@link BytesInputCompressor} instance for the specified codec
name to be used for compressing page data.
+ * <p>
+ * The compressor is not thread-safe, so one instance for each working
thread is required.
+ *
+ * @param codecName the codec name which the compressor instance is to be
returned
+ * @return the compressor instance for the specified codec name
+ * @see BytesInputCompressor#release()
+ */
BytesInputCompressor getCompressor(CompressionCodecName codecName);
+ /**
+ * Returns a {@link BytesInputDecompressor} instance for the specified codec
name to be used for decompressing page
+ * data.
+ * <p>
+ * The decompressor is not thread-safe, so one instance for each working
thread is required.
+ *
+ * @param codecName the codec name which the decompressor instance is to be
returned
+ * @return the decompressor instance for the specified codec name
+ * @see BytesInputDecompressor#release()
+ */
BytesInputDecompressor getDecompressor(CompressionCodecName codecName);
+ /**
+ * Releasing this factory instance.
+ * <p>
+ * Each compressor/decompressor instance shall be released before invoking
this. Nor the compressor/decompressor
+ * instances retrieved from this factory nor this factory instance itself
shall be used after release.
+ *
+ * @see BytesInputCompressor#release()
+ * @see BytesInputDecompressor#release()
+ */
void release();
+ /**
+ * Compressor instance of a specific codec to be used for compressing page
data.
+ * <p>
+ * This compressor shall be released after use. This compressor shall not be
used after release.
+ *
+ * @see #release()
+ */
interface BytesInputCompressor {
+
+ /**
+ * Compresses the specified {@link BytesInput} data and returns it as
{@link BytesInput}.
+ * <p>
+ * Depending on the implementation {@code bytes} might be completely
consumed. The returned {@link BytesInput}
+ * instance needs to be consumed before using this compressor again. This
is because the implementation might use
+ * its internal buffer to directly provide the returned {@link BytesInput}
instance.
+ *
+ * @param bytes the page data to be compressed
+ * @return a {@link BytesInput} containing the compressed data. Needs to
be consumed before using this compressor
+ * again.
+ * @throws IOException if any I/O error occurs during the compression
+ */
BytesInput compress(BytesInput bytes) throws IOException;
+ /**
+ * Returns the codec name of this compressor.
+ *
+ * @return the codec name
+ */
CompressionCodecName getCodecName();
+ /**
+ * Releases this compressor instance.
+ * <p>
+ * No subsequent calls on this instance nor the returned {@link
BytesInput} instance returned by
+ * {@link #compress(BytesInput)} shall be used after release.
+ */
void release();
}
+ /**
+ * Decompressor instance of a specific codec to be used for decompressing
page data.
+ * <p>
+ * This decompressor shall be released after use. This decompressor shall
not be used after release.
+ *
+ * @see #release()
+ */
interface BytesInputDecompressor {
- BytesInput decompress(BytesInput bytes, int uncompressedSize) throws
IOException;
- void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
int uncompressedSize)
+ /**
+ * Decompresses the specified {@link BytesInput} data and returns it as
{@link BytesInput}.
+ * <p>
+ * The decompressed data must have the size specified. Depending on the
implementation {@code bytes} might be
+ * completely consumed. The returned {@link BytesInput} instance needs to
be consumed before using this decompressor
+ * again. This is because the implementation might use its internal buffer
to directly provide the returned
+ * {@link BytesInput} instance.
+ *
+ * @param bytes the page data to be decompressed
+ * @param decompressedSize the exact size of the decompressed data
+ * @return a {@link BytesInput} containing the decompressed data. Needs to
be consumed before using this
+ * decompressor again.
+ * @throws IOException if any I/O error occurs during the decompression
+ */
+ BytesInput decompress(BytesInput bytes, int decompressedSize) throws
IOException;
+
+ /**
+ * Decompresses {@code compressedSize} bytes from {@code input} from the
current position. The decompressed bytes is
+ * to be written int {@code output} from its current position. The
decompressed data must have the size specified.
+ * <p>
+ * {@code output} must have the available bytes of {@code
decompressedSize}. According to the {@link ByteBuffer}
+ * contract the position of {@code input} will be increased by {@code
compressedSize}, and the position of
+ * {@code output} will be increased by {@code decompressedSize}. (It
means, one would have to flip the output buffer
+ * before reading the decompressed data from it.)
+ *
+ * @param input the input buffer where the data is to be
decompressed from
+ * @param compressedSize the exact size of the compressed (input) data
+ * @param output the output buffer where the data is to be
decompressed into
+ * @param decompressedSize the exact size of the decompressed (output) data
+ * @throws IOException if any I/O error occurs during the decompression
+ */
+ void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
int decompressedSize)
throws IOException;
+ /**
+ * Releases this decompressor instance. No subsequent calls on this
instance nor the returned {@link BytesInput}
+ * instance returned by {@link #decompress(BytesInput, int)} shall be used
after release.
+ */
void release();
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 98c7002fe..f0775484c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -56,16 +56,20 @@ public class CodecFactory implements
CompressionCodecFactory {
static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize) {
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize) {
Preconditions.checkArgument(
- compressedSize == uncompressedSize,
- "Non-compressed data did not have matching compressed and
uncompressed sizes.");
- output.clear();
- output.put((ByteBuffer)
input.duplicate().position(0).limit(compressedSize));
+ compressedSize == decompressedSize,
+ "Non-compressed data did not have matching compressed and
decompressed sizes.");
+ Preconditions.checkArgument(
+ input.remaining() >= compressedSize, "Not enough bytes available in
the input buffer");
+ int origLimit = input.limit();
+ input.limit(input.position() + compressedSize);
+ output.put(input);
+ input.limit(origLimit);
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
+ public BytesInput decompress(BytesInput bytes, int decompressedSize) {
return bytes;
}
@@ -150,7 +154,7 @@ public class CodecFactory implements
CompressionCodecFactory {
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
+ public BytesInput decompress(BytesInput bytes, int decompressedSize)
throws IOException {
final BytesInput decompressed;
if (decompressor != null) {
decompressor.reset();
@@ -162,20 +166,27 @@ public class CodecFactory implements
CompressionCodecFactory {
// This change will load the decompressor stream into heap a little
earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for
ZSTD streams.
if (codec instanceof ZstandardCodec) {
- decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+ decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
is.close();
} else {
- decompressed = BytesInput.from(is, uncompressedSize);
+ decompressed = BytesInput.from(is, decompressedSize);
}
return decompressed;
}
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize)
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize)
throws IOException {
+ Preconditions.checkArgument(
+ input.remaining() >= compressedSize, "Not enough bytes available in
the input buffer");
+ int origLimit = input.limit();
+ int origPosition = input.position();
+ input.limit(origPosition + compressedSize);
ByteBuffer decompressed =
- decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
+ decompress(BytesInput.from(input), decompressedSize).toByteBuffer();
output.put(decompressed);
+ input.limit(origLimit);
+ input.position(origPosition + compressedSize);
}
public void release() {
@@ -338,9 +349,9 @@ public class CodecFactory implements
CompressionCodecFactory {
*/
@Deprecated
public abstract static class BytesDecompressor implements
CompressionCodecFactory.BytesInputDecompressor {
- public abstract BytesInput decompress(BytesInput bytes, int
uncompressedSize) throws IOException;
+ public abstract BytesInput decompress(BytesInput bytes, int
decompressedSize) throws IOException;
- public abstract void decompress(ByteBuffer input, int compressedSize,
ByteBuffer output, int uncompressedSize)
+ public abstract void decompress(ByteBuffer input, int compressedSize,
ByteBuffer output, int decompressedSize)
throws IOException;
public abstract void release();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index cc20c1562..c7fc22b29 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -168,12 +168,7 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
decompressedBuffer,
dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);
-
- // HACKY: sometimes we need to do `flip` because the position of
output bytebuffer is
- // not reset.
- if (decompressedBuffer.position() != 0) {
- decompressedBuffer.flip();
- }
+ decompressedBuffer.flip();
decompressed = BytesInput.from(decompressedBuffer);
} else { // use on-heap buffer
if (null != blockDecryptor) {
@@ -225,9 +220,6 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
}
BytesInput pageBytes = dataPageV2.getData();
try {
- BytesInput decompressed;
- long compressedSize;
-
if (options.getAllocator().isDirect() &&
options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser);
if (!byteBuffer.isDirect()) {
@@ -236,7 +228,7 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
- compressedSize = byteBuffer.limit();
+ long compressedSize = byteBuffer.limit();
if (dataPageV2.isCompressed()) {
int uncompressedSize =
Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
@@ -248,12 +240,7 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
decompressor.decompress(
byteBuffer, (int) compressedSize, decompressedBuffer,
uncompressedSize);
setDecompressMetrics(pageBytes, start);
-
- // HACKY: sometimes we need to do `flip` because the position
of output bytebuffer is
- // not reset.
- if (decompressedBuffer.position() != 0) {
- decompressedBuffer.flip();
- }
+ decompressedBuffer.flip();
pageBytes = BytesInput.from(decompressedBuffer);
} else {
pageBytes = BytesInput.from(byteBuffer);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index f509dce55..523e57dbf 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -68,7 +68,8 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
try {
tempClass =
Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec");
tempCreateMethod = tempClass.getMethod("createDirectDecompressor");
- tempDecompressMethod = tempClass.getMethod("decompress",
ByteBuffer.class, ByteBuffer.class);
+ Class<?> tempClass2 =
Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
+ tempDecompressMethod = tempClass2.getMethod("decompress",
ByteBuffer.class, ByteBuffer.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
// do nothing, the class will just be assigned null
}
@@ -150,27 +151,25 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
+ public BytesInput decompress(BytesInput bytes, int decompressedSize)
throws IOException {
decompressor.reset();
byte[] inputBytes = bytes.toByteArray();
decompressor.setInput(inputBytes, 0, inputBytes.length);
- byte[] output = new byte[uncompressedSize];
- decompressor.decompress(output, 0, uncompressedSize);
+ byte[] output = new byte[decompressedSize];
+ decompressor.decompress(output, 0, decompressedSize);
return BytesInput.from(output);
}
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize)
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize)
throws IOException {
decompressor.reset();
byte[] inputBytes = new byte[compressedSize];
- input.position(0);
input.get(inputBytes);
decompressor.setInput(inputBytes, 0, inputBytes.length);
- byte[] outputBytes = new byte[uncompressedSize];
- decompressor.decompress(outputBytes, 0, uncompressedSize);
- output.clear();
+ byte[] outputBytes = new byte[decompressedSize];
+ decompressor.decompress(outputBytes, 0, decompressedSize);
output.put(outputBytes);
}
@@ -193,14 +192,14 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
+ public BytesInput decompress(BytesInput bytes, int decompressedSize)
throws IOException {
try (ByteBufferReleaser releaser = inputAllocator.getReleaser()) {
ByteBuffer input = bytes.toByteBuffer(releaser);
- ByteBuffer output = outputAllocator.allocate(uncompressedSize);
+ ByteBuffer output = outputAllocator.allocate(decompressedSize);
int size = decompress(input.slice(), output.slice());
- if (size != uncompressedSize) {
+ if (size != decompressedSize) {
throw new DirectCodecPool.ParquetCompressionCodecException(
- "Unexpected decompressed size: " + size + " != " +
uncompressedSize);
+ "Unexpected decompressed size: " + size + " != " +
decompressedSize);
}
output.limit(size);
return BytesInput.from(output);
@@ -210,26 +209,26 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
abstract int decompress(ByteBuffer input, ByteBuffer output) throws
IOException;
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize)
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize)
throws IOException {
+ int origInputLimit = input.limit();
input.limit(input.position() + compressedSize);
- output.limit(output.position() + uncompressedSize);
+ int origOutputLimit = output.limit();
+ output.limit(output.position() + decompressedSize);
int size = decompress(input.slice(), output.slice());
- if (size != uncompressedSize) {
+ if (size != decompressedSize) {
throw new DirectCodecPool.ParquetCompressionCodecException(
- "Unexpected decompressed size: " + size + " != " +
uncompressedSize);
+ "Unexpected decompressed size: " + size + " != " +
decompressedSize);
}
input.position(input.limit());
- output.position(output.position() + uncompressedSize);
+ input.limit(origInputLimit);
+ output.position(output.limit());
+ output.limit(origOutputLimit);
}
@Override
public void release() {
- try {
- AutoCloseables.uncheckedClose(outputAllocator, inputAllocator);
- } finally {
- closeDecompressor();
- }
+ AutoCloseables.uncheckedClose(outputAllocator, inputAllocator,
this::closeDecompressor);
}
abstract void closeDecompressor();
@@ -264,11 +263,7 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
@Override
public void release() {
- try {
- AutoCloseables.uncheckedClose(outputAllocator, inputAllocator);
- } finally {
- closeCompressor();
- }
+ AutoCloseables.uncheckedClose(outputAllocator, inputAllocator,
this::closeCompressor);
}
abstract void closeCompressor();
@@ -296,22 +291,22 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
}
@Override
- public BytesInput decompress(BytesInput compressedBytes, int
uncompressedSize) throws IOException {
+ public BytesInput decompress(BytesInput compressedBytes, int
decompressedSize) throws IOException {
// Similarly to non-direct decompressors, we reset before use, if
possible (see HeapBytesDecompressor)
if (decompressor instanceof Decompressor) {
((Decompressor) decompressor).reset();
}
- return super.decompress(compressedBytes, uncompressedSize);
+ return super.decompress(compressedBytes, decompressedSize);
}
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize)
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize)
throws IOException {
// Similarly to non-direct decompressors, we reset before use, if
possible (see HeapBytesDecompressor)
if (decompressor instanceof Decompressor) {
((Decompressor) decompressor).reset();
}
- super.decompress(input, compressedSize, output, uncompressedSize);
+ super.decompress(input, compressedSize, output, decompressedSize);
}
@Override
@@ -322,7 +317,10 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
} catch (IllegalAccessException | InvocationTargetException e) {
throw new DirectCodecPool.ParquetCompressionCodecException(e);
}
- return output.position() - startPos;
+ int size = output.position() - startPos;
+ // Some decompressors flip the output buffer, some don't:
+ // Let's rely on the limit if the position did not change
+ return size == 0 ? output.limit() : size;
}
@Override
@@ -338,22 +336,20 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
public class NoopDecompressor extends BytesDecompressor {
@Override
- public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize)
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int decompressedSize)
throws IOException {
- Preconditions.checkArgument(
- compressedSize == uncompressedSize,
- "Non-compressed data did not have matching compressed and
uncompressed sizes.");
- output.clear();
- output.put((ByteBuffer)
input.duplicate().position(0).limit(compressedSize));
+ NO_OP_DECOMPRESSOR.decompress(input, compressedSize, output,
decompressedSize);
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
- return bytes;
+ public BytesInput decompress(BytesInput bytes, int decompressedSize)
throws IOException {
+ return NO_OP_DECOMPRESSOR.decompress(bytes, decompressedSize);
}
@Override
- public void release() {}
+ public void release() {
+ NO_OP_DECOMPRESSOR.release();
+ }
}
public class SnappyDecompressor extends BaseDecompressor {
@@ -416,6 +412,8 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
context = new ZstdCompressCtx();
context.setLevel(configuration.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL,
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL));
+ context.setWorkers(configuration.getInt(
+ ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS,
ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}
@Override
@@ -449,16 +447,18 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
@Override
public BytesInput compress(BytesInput bytes) throws IOException {
- return bytes;
+ return NO_OP_COMPRESSOR.compress(bytes);
}
@Override
public CompressionCodecName getCodecName() {
- return CompressionCodecName.UNCOMPRESSED;
+ return NO_OP_COMPRESSOR.getCodecName();
}
@Override
- public void release() {}
+ public void release() {
+ NO_OP_COMPRESSOR.release();
+ }
}
static class DirectCodecPool {
@@ -486,7 +486,8 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
private CodecPool(final CompressionCodec codec) {
try {
- boolean supportDirectDecompressor = codec.getClass() ==
DIRECT_DECOMPRESSION_CODEC_CLASS;
+ boolean supportDirectDecompressor = DIRECT_DECOMPRESSION_CODEC_CLASS
!= null
+ &&
DIRECT_DECOMPRESSION_CODEC_CLASS.isAssignableFrom(codec.getClass());
compressorPool = new GenericObjectPool(
new BasePoolableObjectFactory() {
public Object makeObject() throws Exception {
@@ -533,8 +534,7 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
directDecompressorPool = new GenericObjectPool(
new BasePoolableObjectFactory() {
public Object makeObject() throws Exception {
- return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(
- DIRECT_DECOMPRESSION_CODEC_CLASS);
+ return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(codec);
}
},
Integer.MAX_VALUE);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 22dc7e30f..a76b843e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -916,6 +916,7 @@ public class ParquetWriter<T> implements Closeable {
mode,
getWriteSupport(conf),
codecName,
+ codecFactory,
rowGroupSize,
enableValidation,
conf,
@@ -928,6 +929,7 @@ public class ParquetWriter<T> implements Closeable {
mode,
getWriteSupport(conf),
codecName,
+ codecFactory,
rowGroupSize,
enableValidation,
conf,
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
index fd8c1a81e..6a25c5982 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressor;
/**
* Lz4 raw compression codec for Parquet. This codec type has been introduced
@@ -39,7 +41,7 @@ import org.apache.hadoop.io.compress.Decompressor;
* below for reference.
* https://github.com/apache/parquet-format/blob/master/Compression.md
*/
-public class Lz4RawCodec implements Configurable, CompressionCodec {
+public class Lz4RawCodec implements Configurable, CompressionCodec,
DirectDecompressionCodec {
private Configuration conf;
@@ -68,6 +70,11 @@ public class Lz4RawCodec implements Configurable,
CompressionCodec {
return new Lz4RawDecompressor();
}
+ @Override
+ public DirectDecompressor createDirectDecompressor() {
+ return new Lz4RawDecompressor();
+ }
+
@Override
public CompressionInputStream createInputStream(InputStream stream) throws
IOException {
return createInputStream(stream, createDecompressor());
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
index 7477bda87..68839d281 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
@@ -21,8 +21,9 @@ package org.apache.parquet.hadoop.codec;
import io.airlift.compress.lz4.Lz4Decompressor;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.compress.DirectDecompressor;
-public class Lz4RawDecompressor extends NonBlockedDecompressor {
+public class Lz4RawDecompressor extends NonBlockedDecompressor implements
DirectDecompressor {
private Lz4Decompressor decompressor = new Lz4Decompressor();
@@ -41,4 +42,9 @@ public class Lz4RawDecompressor extends
NonBlockedDecompressor {
uncompressed.rewind();
return uncompressedSize;
}
+
+ @Override
+ public void decompress(ByteBuffer compressed, ByteBuffer uncompressed)
throws IOException {
+ uncompress(compressed, uncompressed);
+ }
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
index 3a1779588..c78ee09ec 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -19,14 +19,17 @@ package org.apache.parquet.hadoop;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -55,13 +58,12 @@ public class TestDirectCodecFactory {
private void test(int size, CompressionCodecName codec, boolean
useOnHeapCompression, Decompression decomp) {
try (TrackingByteBufferAllocator allocator =
TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
- final CodecFactory codecFactory =
+ final CodecFactory directCodecFactory =
CodecFactory.createDirectCodecFactory(new Configuration(),
allocator, pageSize);
+ final CodecFactory heapCodecFactory = new CodecFactory(new
Configuration(), pageSize);
ByteBuffer rawBuf = allocator.allocate(size);
releaser.releaseLater(rawBuf);
final byte[] rawArr = new byte[size];
- ByteBuffer outBuf = allocator.allocate(size * 2);
- releaser.releaseLater(outBuf);
final Random r = new Random();
final byte[] random = new byte[1024];
int pos = 0;
@@ -73,56 +75,51 @@ public class TestDirectCodecFactory {
}
rawBuf.flip();
- final BytesInputCompressor c = codecFactory.getCompressor(codec);
- final BytesInputDecompressor d = codecFactory.getDecompressor(codec);
+ final BytesInputCompressor directCompressor =
directCodecFactory.getCompressor(codec);
+ final BytesInputDecompressor directDecompressor =
directCodecFactory.getDecompressor(codec);
+ final BytesInputCompressor heapCompressor =
heapCodecFactory.getCompressor(codec);
+ final BytesInputDecompressor heapDecompressor =
heapCodecFactory.getDecompressor(codec);
- final BytesInput compressed;
+ if (codec == LZ4_RAW) {
+ // Hadoop codecs support direct decompressors only if the related
native libraries are available.
+ // This is not the case for our CI so let's rely on LZ4_RAW where the
implementation is our own.
+ Assert.assertTrue(
+ String.format("The hadoop codec %s should support direct
decompression", codec),
+ directDecompressor instanceof
DirectCodecFactory.FullDirectDecompressor);
+ }
+
+ final BytesInput directCompressed;
if (useOnHeapCompression) {
- compressed = c.compress(BytesInput.from(rawArr));
+ directCompressed = directCompressor.compress(BytesInput.from(rawArr));
} else {
- compressed = c.compress(BytesInput.from(rawBuf));
+ directCompressed = directCompressor.compress(BytesInput.from(rawBuf));
}
- switch (decomp) {
- case OFF_HEAP: {
- final ByteBuffer buf = compressed.toByteBuffer();
- final ByteBuffer b = allocator.allocate(buf.capacity());
- try {
- b.put(buf);
- b.flip();
- d.decompress(b, (int) compressed.size(), outBuf, size);
- for (int i = 0; i < size; i++) {
- Assert.assertTrue("Data didn't match at " + i, outBuf.get(i) ==
rawBuf.get(i));
- }
- } finally {
- allocator.release(b);
- }
- break;
- }
+ BytesInput heapCompressed =
heapCompressor.compress(BytesInput.from(rawArr));
- case OFF_HEAP_BYTES_INPUT: {
- final ByteBuffer buf = compressed.toByteBuffer();
- final ByteBuffer b = allocator.allocate(buf.limit());
- try {
- b.put(buf);
- b.flip();
- final BytesInput input = d.decompress(BytesInput.from(b), size);
- Assert.assertArrayEquals(
- String.format("While testing codec %s", codec),
input.toByteArray(), rawArr);
- } finally {
- allocator.release(b);
- }
- break;
- }
- case ON_HEAP: {
- final byte[] buf = compressed.toByteArray();
- final BytesInput input = d.decompress(BytesInput.from(buf), size);
- Assert.assertArrayEquals(input.toByteArray(), rawArr);
- break;
- }
- }
- c.release();
- d.release();
+ // Validate direct => direct
+ validateDecompress(
+ size,
+ codec,
+ decomp,
+ directCompressed.copy(releaser),
+ allocator,
+ directDecompressor,
+ rawBuf,
+ rawArr);
+
+ // Validate heap => direct
+ validateDecompress(size, codec, decomp, heapCompressed, allocator,
directDecompressor, rawBuf, rawArr);
+
+ // Validate direct => heap
+ validateDecompress(size, codec, decomp, directCompressed, allocator,
heapDecompressor, rawBuf, rawArr);
+
+ directCompressor.release();
+ directDecompressor.release();
+ directCodecFactory.release();
+ heapCompressor.release();
+ heapDecompressor.release();
+ heapCodecFactory.release();
} catch (Exception e) {
final String msg = String.format(
"Failure while testing Codec: %s, OnHeapCompressionInput: %s,
Decompression Mode: %s, Data Size: %d",
@@ -132,6 +129,69 @@ public class TestDirectCodecFactory {
}
}
+ private static void validateDecompress(
+ int size,
+ CompressionCodecName codec,
+ Decompression decomp,
+ BytesInput compressed,
+ ByteBufferAllocator allocator,
+ BytesInputDecompressor d,
+ ByteBuffer rawBuf,
+ byte[] rawArr)
+ throws IOException {
+ switch (decomp) {
+ case OFF_HEAP: {
+ final ByteBuffer buf = compressed.toByteBuffer();
+ final ByteBuffer b = allocator.allocate(buf.capacity() + 20);
+ final ByteBuffer outBuf = allocator.allocate(size + 20);
+ final int shift = 10;
+ try {
+ b.position(shift);
+ b.put(buf);
+ b.position(shift);
+ outBuf.position(shift);
+ d.decompress(b, (int) compressed.size(), outBuf, size);
+ Assert.assertEquals(
+ "Input buffer position mismatch for codec " + codec,
+ compressed.size() + shift,
+ b.position());
+ Assert.assertEquals(
+ "Output buffer position mismatch for codec " + codec, size +
shift, outBuf.position());
+ for (int i = 0; i < size; i++) {
+ Assert.assertTrue(
+ String.format("Data didn't match at %d, while testing codec
%s", i, codec),
+ outBuf.get(shift + i) == rawBuf.get(i));
+ }
+ } finally {
+ allocator.release(b);
+ allocator.release(outBuf);
+ }
+ break;
+ }
+
+ case OFF_HEAP_BYTES_INPUT: {
+ final ByteBuffer buf = compressed.toByteBuffer();
+ final ByteBuffer b = allocator.allocate(buf.limit());
+ try {
+ b.put(buf);
+ b.flip();
+ final BytesInput input = d.decompress(BytesInput.from(b), size);
+ Assert.assertArrayEquals(
+ String.format("While testing codec %s", codec),
input.toByteArray(), rawArr);
+ } finally {
+ allocator.release(b);
+ }
+ break;
+ }
+ case ON_HEAP: {
+ final byte[] buf = compressed.toByteArray();
+ final BytesInput input = d.decompress(BytesInput.from(buf), size);
+ Assert.assertArrayEquals(String.format("While testing codec %s",
codec), input.toByteArray(), rawArr);
+ break;
+ }
+ }
+ }
+
@Test
public void createDirectFactoryWithHeapAllocatorFails() {
String errorMsg =