[
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17675813#comment-17675813
]
ASF GitHub Bot commented on PARQUET-2212:
-----------------------------------------
wgtmac commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1067699349
##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
* @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
* @return plaintext - starts at offset 0 of the output value, and fills
up the entire byte array.
*/
- public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+ byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
/**
+ * Convenience decryption method that reads the length and ciphertext from
a ByteBuffer
+ *
+ * @param from ByteBuffer with length and ciphertext.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
+ * @return plaintext - starts at offset 0 of the output, and fills up the
entire byte buffer.
Review Comment:
```suggestion
* @return plaintext - starts at offset 0 of the output, and fills up
the entire byte buffer.
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
Review Comment:
Remove the blank line
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -187,6 +189,7 @@ public static <T> Builder<T> builder(ReadSupport<T>
readSupport, Path path) {
private final InputFile file;
private final Path path;
private Filter filter = null;
+ private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
Review Comment:
```suggestion
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
public DataPage visit(DataPageV1 dataPageV1) {
try {
BytesInput bytes = dataPageV1.getBytes();
- if (null != blockDecryptor) {
- bytes =
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ BytesInput decompressed;
+
+ if (options.getAllocator().isDirect() &&
options.useOffHeapDecryptBuffer()) {
+ ByteBuffer byteBuffer = bytes.toByteBuffer();
+ if (!byteBuffer.isDirect()) {
Review Comment:
Should we fallback to the old path in this case?
##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -202,7 +211,7 @@ public static Collection<Object[]> data() {
private static final boolean plaintextFilesAllowed = true;
- private static final int ROW_COUNT = 10000;
+ private static final int ROW_COUNT = 100;
Review Comment:
Why decreasing the row count?
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
Review Comment:
There are many duplicate code from `byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLength, byte[] AAD)`. Is it possible to
refactor them to share common logic?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
public DataPage visit(DataPageV1 dataPageV1) {
Review Comment:
Does it apply to data page v2 as well?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java:
##########
@@ -144,6 +144,11 @@
*/
public static final String BLOOM_FILTERING_ENABLED =
"parquet.filter.bloom.enabled";
+ /**
+ * Key to configure if off-heap buffer should be used for decryption
+ */
+ public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED =
"parquet.decrypt.off-heap.buffer.enabled";
Review Comment:
Please document the new config parameter to the
[README](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md)
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
+ int cipherTextOffset = SIZE_LENGTH;
+ int cipherTextLength = ciphertext.limit() - ciphertext.position() -
SIZE_LENGTH;
+
+ int plainTextLength = cipherTextLength - NONCE_LENGTH;
+ if (plainTextLength < 1) {
+ throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
+ }
+
+ // skip size
+ ciphertext.position(ciphertext.position() + cipherTextOffset);
+ // Get the nonce from ciphertext
+ ciphertext.get(ctrIV, 0, NONCE_LENGTH);
+
+ // Reuse the input buffer as the output buffer
+ ByteBuffer plainText = ciphertext.slice();
+ plainText.limit(plainTextLength);
+ int inputLength = cipherTextLength - NONCE_LENGTH;
+ int inputOffset = cipherTextOffset + NONCE_LENGTH;
+ try {
+ IvParameterSpec spec = new IvParameterSpec(ctrIV);
+ cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+
+ // Breaking decryption into multiple updates, to trigger h/w
acceleration in Java 9+
+ while (inputLength > CHUNK_LENGTH) {
+ ciphertext.position(inputOffset);
+ ciphertext.limit(inputOffset + CHUNK_LENGTH);
+ cipher.update(ciphertext, plainText);
+ inputOffset += CHUNK_LENGTH;
+ inputLength -= CHUNK_LENGTH;
+ }
+ ciphertext.position(inputOffset);
+ ciphertext.limit(inputOffset + inputLength);
+ cipher.doFinal(ciphertext, plainText);
+ plainText.flip();
+ } catch (GeneralSecurityException e) {
+ throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
Review Comment:
```suggestion
throw new ParquetCryptoRuntimeException("Failed to decrypt ", e);
```
##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -36,11 +37,11 @@
* The ciphertext includes the nonce and (in case of GCM cipher) the tag,
as detailed in the
* Parquet Modular Encryption specification.
*/
- public byte[] encrypt(byte[] plaintext, byte[] AAD);
+ byte[] encrypt(byte[] plaintext, byte[] AAD);
Review Comment:
Will the encryptor benefit from a similar overload of `ByteBuffer`?
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java:
##########
@@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
Review Comment:
ditto
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -242,6 +245,12 @@ public Builder<T> withFilter(Filter filter) {
return this;
}
+ public Builder<T> withAllocator(ByteBufferAllocator allocator) {
Review Comment:
What is the purpose of setting it here? It will override the one in the
`ParquetReadOptions`.
> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> ------------------------------------------------------------------------
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Affects Versions: 1.12.3
> Reporter: Parth Chandra
> Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the
> cost of copying the data into a byte array (and sometimes back to a
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a
> HeapByteBuffer, then we can continue to use the byte array API since that
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not
> able to use hardware acceleration in JVM's before JDK17. In those cases, the
> overall decryption speed is faster with byte arrays even after incurring the
> overhead of making a copy.
> The proposal, then, is to enable the use of the ByteBuffer api for
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user
> explicitly configures it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)