[
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676589#comment-17676589
]
ASF GitHub Bot commented on PARQUET-2212:
-----------------------------------------
parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1069182893
##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
* @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
* @return plaintext - starts at offset 0 of the output value, and fills
up the entire byte array.
*/
- public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+ byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
/**
+ * Convenience decryption method that reads the length and ciphertext from
a ByteBuffer
+ *
+ * @param from ByteBuffer with length and ciphertext.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
+ * @return plaintext - starts at offset 0 of the output, and fills up the
entire byte buffer.
Review Comment:
Done
##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -36,11 +37,11 @@
* The ciphertext includes the nonce and (in case of GCM cipher) the tag,
as detailed in the
* Parquet Modular Encryption specification.
*/
- public byte[] encrypt(byte[] plaintext, byte[] AAD);
+ byte[] encrypt(byte[] plaintext, byte[] AAD);
Review Comment:
Probably. This pr only focusses on the read path though. I can add the
encryptor api when I look at the write path.
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
Review Comment:
Done
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java:
##########
@@ -144,6 +144,11 @@
*/
public static final String BLOOM_FILTERING_ENABLED =
"parquet.filter.bloom.enabled";
+ /**
+ * Key to configure if off-heap buffer should be used for decryption
+ */
+ public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED =
"parquet.decrypt.off-heap.buffer.enabled";
Review Comment:
Done
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
public DataPage visit(DataPageV1 dataPageV1) {
Review Comment:
Actually, it does. Added it for V2 and updated the unit tests as well.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -242,6 +245,12 @@ public Builder<T> withFilter(Filter filter) {
return this;
}
+ public Builder<T> withAllocator(ByteBufferAllocator allocator) {
Review Comment:
Well, yes. ParquetReader has an instance of `ParquetReadOptions` and this
builder simply passes the allocator down to that instance. Adding this here is
mirroring the other options being set in `ParquetReader.Builder`. Used in the
unit tests.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
public DataPage visit(DataPageV1 dataPageV1) {
try {
BytesInput bytes = dataPageV1.getBytes();
- if (null != blockDecryptor) {
- bytes =
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ BytesInput decompressed;
+
+ if (options.getAllocator().isDirect() &&
options.useOffHeapDecryptBuffer()) {
+ ByteBuffer byteBuffer = bytes.toByteBuffer();
+ if (!byteBuffer.isDirect()) {
Review Comment:
I think there was a previous discussion on this and we simplified the code
to throw an error. Basically, we expect that a user the ParquetReader will be
using either a direct buffer or a heap buffer all the way thru, so that
encountering both a direct buffer and a heap buffer is an error.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -187,6 +189,7 @@ public static <T> Builder<T> builder(ReadSupport<T>
readSupport, Path path) {
private final InputFile file;
private final Path path;
private Filter filter = null;
+ private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
Review Comment:
Done
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java:
##########
@@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
Review Comment:
Done
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
+ int cipherTextOffset = SIZE_LENGTH;
+ int cipherTextLength = ciphertext.limit() - ciphertext.position() -
SIZE_LENGTH;
+
+ int plainTextLength = cipherTextLength - NONCE_LENGTH;
+ if (plainTextLength < 1) {
+ throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
+ }
+
+ // skip size
+ ciphertext.position(ciphertext.position() + cipherTextOffset);
+ // Get the nonce from ciphertext
+ ciphertext.get(ctrIV, 0, NONCE_LENGTH);
+
+ // Reuse the input buffer as the output buffer
+ ByteBuffer plainText = ciphertext.slice();
+ plainText.limit(plainTextLength);
+ int inputLength = cipherTextLength - NONCE_LENGTH;
+ int inputOffset = cipherTextOffset + NONCE_LENGTH;
+ try {
+ IvParameterSpec spec = new IvParameterSpec(ctrIV);
+ cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+
+ // Breaking decryption into multiple updates, to trigger h/w
acceleration in Java 9+
+ while (inputLength > CHUNK_LENGTH) {
+ ciphertext.position(inputOffset);
+ ciphertext.limit(inputOffset + CHUNK_LENGTH);
+ cipher.update(ciphertext, plainText);
+ inputOffset += CHUNK_LENGTH;
+ inputLength -= CHUNK_LENGTH;
+ }
+ ciphertext.position(inputOffset);
+ ciphertext.limit(inputOffset + inputLength);
+ cipher.doFinal(ciphertext, plainText);
+ plainText.flip();
+ } catch (GeneralSecurityException e) {
+ throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
Review Comment:
The original implementation in `decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLength, byte[] AAD)` has it the same way
(without the trailing space). Leaving it as it is to keep it consistent.
If you want, we can change both. I would rather we terminate the message
with a `.` (period).
##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int
cipherTextOffset, int cipherTextLen
return plainText;
}
+ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
Review Comment:
I've tried to, but the changes are rather intertwined and we'll end up with
multiple functions abstracting out the common lines of code, with none of the
functions seeming to be a complete unit by itself. I feel the current version
is easier to read and maintain.
##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -202,7 +211,7 @@ public static Collection<Object[]> data() {
private static final boolean plaintextFilesAllowed = true;
- private static final int ROW_COUNT = 10000;
+ private static final int ROW_COUNT = 100;
Review Comment:
TBH, I forget why. But on looking at it again, and rerunning the tests, I've
_increased_ the value to test a part of the `AesCtrDecryptor` that was not
getting tested before.
> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> ------------------------------------------------------------------------
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Affects Versions: 1.12.3
> Reporter: Parth Chandra
> Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the
> cost of copying the data into a byte array (and sometimes back to a
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a
> HeapByteBuffer, then we can continue to use the byte array API since that
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not
> able to use hardware acceleration in JVM's before JDK17. In those cases, the
> overall decryption speed is faster with byte arrays even after incurring the
> overhead of making a copy.
> The proposal, then, is to enable the use of the ByteBuffer api for
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user
> explicitly configures it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)