parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1021927002


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();
+            BytesInput decompressed;
+
+            if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+              if (blockDecryptor != null) {
+                byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+              }
+              long compressedSize = byteBuffer.limit();
+
+              ByteBuffer decompressedBuffer =
+                  ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());
+              decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,

Review Comment:
   Not really adding direct buffer support for decompresson. That is already 
implemented ((see `FullDirectDecompressor`).
   For this path only (ie direct buffer and direct buffer decryption is 
enabled), the buffer returned by decrypt is a direct byte buffer. In this case, 
it is efficient to call the direct byte buffer decompress api which is already 
available.



##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -196,13 +205,13 @@ public static Collection<Object[]> data() {
     .append(COLUMN_MASTER_KEY_IDS[5]).append(": 
").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
     .toString();
 
-  private static final int NUM_THREADS = 4;
+  private static final int NUM_THREADS = 1;

Review Comment:
   Oops. My mistake. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##########
@@ -44,6 +46,9 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.
+  private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT =
+    SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17);

Review Comment:
   It looks like ByteBuffers do not have hardware acceleration enabled until 
JDK17. So, if and only if, the file read buffer is  a direct byte buffer and 
JDK is greater than 17, decryption will be as fast as byte arrays, but we will 
avoid the copy from a direct byte buffer to a byte array in 
`ColumnChunkPageReadStore`
   I can default to false if you prefer.
   



##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
      * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
      * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
      */
-    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+    byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
     /**
+     * Convenience decryption method that reads the length and ciphertext from 
a ByteBuffer
+     *
+     * @param from ByteBuffer with length and ciphertext.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+     * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte array.

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();

Review Comment:
   Done.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();
+            BytesInput decompressed;
+
+            if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+              if (blockDecryptor != null) {
+                byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+              }
+              long compressedSize = byteBuffer.limit();
+
+              ByteBuffer decompressedBuffer =
+                  ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());

Review Comment:
   Hmm. Using a buffer pool here is a major PR by itself. The big problem is 
managing the lifecycle of the allocated buffer. The allocated buffer is passed 
on to the caller which can then keep a reference as long as it wants. 
Somewhere, we have to guarantee that the buffer is returned to the pool when no 
longer needed, but we cannot do that here.
   I've changed this to use the allocator allocator specified in the 
ParquetReadOptions (which I should have in the first place). The allocator api 
has a `release` method which the caller needs to call. The provided allocator 
is the best place for the buffer pool implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to