[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676589#comment-17676589
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-----------------------------------------

parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1069182893


##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
      * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
      * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
      */
-    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+    byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
     /**
+     * Convenience decryption method that reads the length and ciphertext from 
a ByteBuffer
+     *
+     * @param from ByteBuffer with length and ciphertext.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+     * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte buffer.

Review Comment:
   Done



##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -36,11 +37,11 @@
      * The ciphertext includes the nonce and (in case of GCM cipher) the tag, 
as detailed in the 
      * Parquet Modular Encryption specification.
      */
-    public byte[] encrypt(byte[] plaintext, byte[] AAD);
+    byte[] encrypt(byte[] plaintext, byte[] AAD);

Review Comment:
   Probably. This pr only focusses on the read path though. I can add the 
encryptor api when I look at the write path. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java:
##########
@@ -144,6 +144,11 @@
    */
   public static final String BLOOM_FILTERING_ENABLED = 
"parquet.filter.bloom.enabled";
 
+  /**
+    * Key to configure if off-heap buffer should be used for decryption
+    */
+  public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = 
"parquet.decrypt.off-heap.buffer.enabled";

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {

Review Comment:
   Actually, it does. Added it for V2 and updated the unit tests as well. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -242,6 +245,12 @@ public Builder<T> withFilter(Filter filter) {
       return this;
     }
 
+    public Builder<T> withAllocator(ByteBufferAllocator allocator) {

Review Comment:
   Well, yes. ParquetReader has an instance of `ParquetReadOptions` and this 
builder simply passes the allocator down to that instance. Adding this here is 
mirroring the other options being set in `ParquetReader.Builder`. Used in the 
unit tests.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            BytesInput decompressed;
+
+            if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
+              ByteBuffer byteBuffer = bytes.toByteBuffer();
+              if (!byteBuffer.isDirect()) {

Review Comment:
   I think there was a previous discussion on this and we simplified the code 
to throw an error. Basically, we expect that a user the ParquetReader will be 
using either a direct buffer or a heap buffer all the way thru, so that 
encountering both a direct buffer and a heap buffer is an error.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -187,6 +189,7 @@ public static <T> Builder<T> builder(ReadSupport<T> 
readSupport, Path path) {
     private final InputFile file;
     private final Path path;
     private Filter filter = null;
+    private ByteBufferAllocator  allocator = new HeapByteBufferAllocator();

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java:
##########
@@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
     return plainText;
   }
 
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
+    int cipherTextOffset = SIZE_LENGTH;
+    int cipherTextLength = ciphertext.limit() - ciphertext.position() - 
SIZE_LENGTH;
+
+    int plainTextLength = cipherTextLength - NONCE_LENGTH;
+    if (plainTextLength < 1) {
+      throw new ParquetCryptoRuntimeException("Wrong input length " + 
plainTextLength);
+    }
+
+    // skip size
+    ciphertext.position(ciphertext.position() + cipherTextOffset);
+    // Get the nonce from ciphertext
+    ciphertext.get(ctrIV, 0, NONCE_LENGTH);
+
+    // Reuse the input buffer as the output buffer
+    ByteBuffer plainText = ciphertext.slice();
+    plainText.limit(plainTextLength);
+    int inputLength = cipherTextLength - NONCE_LENGTH;
+    int inputOffset = cipherTextOffset + NONCE_LENGTH;
+    try {
+      IvParameterSpec spec = new IvParameterSpec(ctrIV);
+      cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+
+      // Breaking decryption into multiple updates, to trigger h/w 
acceleration in Java 9+
+      while (inputLength > CHUNK_LENGTH) {
+        ciphertext.position(inputOffset);
+        ciphertext.limit(inputOffset + CHUNK_LENGTH);
+        cipher.update(ciphertext, plainText);
+        inputOffset += CHUNK_LENGTH;
+        inputLength -= CHUNK_LENGTH;
+      }
+      ciphertext.position(inputOffset);
+      ciphertext.limit(inputOffset + inputLength);
+      cipher.doFinal(ciphertext, plainText);
+      plainText.flip();
+    } catch (GeneralSecurityException e) {
+      throw new ParquetCryptoRuntimeException("Failed to decrypt", e);

Review Comment:
   The original implementation in   `decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLength, byte[] AAD)` has it the same way 
(without the trailing space). Leaving it as it is to keep it consistent. 
   If you want, we can change both. I would rather we terminate the message 
with a `.` (period).
   



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {

Review Comment:
   I've tried to, but the changes are rather intertwined and we'll end up with 
multiple functions abstracting out the common lines of code, with none of the 
functions seeming to be a complete unit by itself. I feel the current version 
is easier to read and maintain.



##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -202,7 +211,7 @@ public static Collection<Object[]> data() {
 
   private static final boolean plaintextFilesAllowed = true;
 
-  private static final int ROW_COUNT = 10000;
+  private static final int ROW_COUNT = 100;

Review Comment:
   TBH, I forget why. But on looking at it again, and rerunning the tests, I've 
_increased_ the value to test a part of the `AesCtrDecryptor` that was not 
getting tested before.





> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> ------------------------------------------------------------------------
>
>                 Key: PARQUET-2212
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2212
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>    Affects Versions: 1.12.3
>            Reporter: Parth Chandra
>            Priority: Major
>             Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to