[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634049#comment-17634049
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-----------------------------------------

parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1021927002


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();
+            BytesInput decompressed;
+
+            if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+              if (blockDecryptor != null) {
+                byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+              }
+              long compressedSize = byteBuffer.limit();
+
+              ByteBuffer decompressedBuffer =
+                  ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());
+              decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,

Review Comment:
   Not really adding direct buffer support for decompresson. That is already 
implemented ((see `FullDirectDecompressor`).
   For this path only (ie direct buffer and direct buffer decryption is 
enabled), the buffer returned by decrypt is a direct byte buffer. In this case, 
it is efficient to call the direct byte buffer decompress api which is already 
available.



##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -196,13 +205,13 @@ public static Collection<Object[]> data() {
     .append(COLUMN_MASTER_KEY_IDS[5]).append(": 
").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
     .toString();
 
-  private static final int NUM_THREADS = 4;
+  private static final int NUM_THREADS = 1;

Review Comment:
   Oops. My mistake. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##########
@@ -44,6 +46,9 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.
+  private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT =
+    SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17);

Review Comment:
   It looks like ByteBuffers do not have hardware acceleration enabled until 
JDK17. So, if and only if, the file read buffer is  a direct byte buffer and 
JDK is greater than 17, decryption will be as fast as byte arrays, but we will 
avoid the copy from a direct byte buffer to a byte array in 
`ColumnChunkPageReadStore`
   I can default to false if you prefer.
   



##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
      * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
      * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
      */
-    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+    byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
     /**
+     * Convenience decryption method that reads the length and ciphertext from 
a ByteBuffer
+     *
+     * @param from ByteBuffer with length and ciphertext.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+     * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte array.

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();

Review Comment:
   Done.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,33 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            ByteBuffer byteBuffer = bytes.toByteBuffer();
+            BytesInput decompressed;
+
+            if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+              if (blockDecryptor != null) {
+                byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+              }
+              long compressedSize = byteBuffer.limit();
+
+              ByteBuffer decompressedBuffer =
+                  ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());

Review Comment:
   Hmm. Using a buffer pool here is a major PR by itself. The big problem is 
managing the lifecycle of the allocated buffer. The allocated buffer is passed 
on to the caller which can then keep a reference as long as it wants. 
Somewhere, we have to guarantee that the buffer is returned to the pool when no 
longer needed, but we cannot do that here.
   I've changed this to use the allocator allocator specified in the 
ParquetReadOptions (which I should have in the first place). The allocator api 
has a `release` method which the caller needs to call. The provided allocator 
is the best place for the buffer pool implementation.





> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> ------------------------------------------------------------------------
>
>                 Key: PARQUET-2212
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2212
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>    Affects Versions: 1.12.3
>            Reporter: Parth Chandra
>            Priority: Major
>             Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to