gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428562074
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1131,7 +1329,19 @@ public OffsetIndex readOffsetIndex(ColumnChunkMetaData
column) throws IOExceptio
return null;
}
f.seek(ref.getOffset());
- return
ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+
+ column.decryptIfNeededed();
Review comment:
Same as above.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -1035,20 +1154,89 @@ private static void serializeBloomFilters(
long offset = out.getPos();
column.setBloomFilterOffset(offset);
-
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
out);
- bloomFilter.writeTo(out);
+
+ BlockCipher.Encryptor bloomFilterEncryptor = null;
+ byte[] bloomFilterHeaderAAD = null;
+ byte[] bloomFilterBitsetAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup =
fileEncryptor.getColumnSetup(column.getPath(), false, (short) cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ bloomFilterEncryptor =
columnEncryptionSetup.getMetaDataEncryptor();
+ short columnOrdinal = columnEncryptionSetup.getOrdinal();
+ bloomFilterHeaderAAD =
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(),
ModuleType.BloomFilterHeader,
+ block.getOrdinal(), columnOrdinal, (short)-1);
+ bloomFilterBitsetAAD =
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(),
ModuleType.BloomFilterBitset,
+ block.getOrdinal(), columnOrdinal, (short)-1);
+ }
+ }
+
+
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
out,
+ bloomFilterEncryptor, bloomFilterHeaderAAD);
+
+ ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+ bloomFilter.writeTo(tempOutStream);
+ byte[] serializedBitset = tempOutStream.toByteArray();
+ if (null != bloomFilterEncryptor) {
+ serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset,
bloomFilterBitsetAAD);
+ }
+ out.write(serializedBitset);
}
}
}
-
- private static void serializeFooter(ParquetMetadata footer,
PositionOutputStream out) throws IOException {
- long footerIndex = out.getPos();
+
+ private static void serializeFooter(ParquetMetadata footer,
PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+
ParquetMetadataConverter metadataConverter = new
ParquetMetadataConverter();
- org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
- writeFileMetaData(parquetMetadata, out);
- LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() -
footerIndex));
- BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
- out.write(MAGIC);
+
+ // Unencrypted file
+ if (null == fileEncryptor) {
+ long footerIndex = out.getPos();
+ org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+ writeFileMetaData(parquetMetadata, out);
+ LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() -
footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+ out.write(MAGIC);
+ return;
+ }
+
+ org.apache.parquet.format.FileMetaData parquetMetadata =
+ metadataConverter.toParquetMetadata(CURRENT_VERSION, footer,
fileEncryptor);
+
+ // Encrypted file with plaintext footer
+ if (!fileEncryptor.isFooterEncrypted()) {
Review comment:
I don't know why we think that the footer length is an important
information but this is the only case where we do not log it. We might want to
add it here as well.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1071,12 +1229,31 @@ public BloomFilterReader
getBloomFilterDataReader(BlockMetaData block) {
*/
public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws
IOException {
long bloomFilterOffset = meta.getBloomFilterOffset();
+
+ if (0 == bloomFilterOffset) { // TODO Junjie - is there a better way to
handle this?
Review comment:
@chenjunjiedada, could you please check this?
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1114,7 +1299,20 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData
column) throws IOExceptio
return null;
}
f.seek(ref.getOffset());
- return
ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(),
Util.readColumnIndex(f));
+
+ column.decryptIfNeededed();
Review comment:
My understanding about this method is that it is invoked inside the
`ColumnChunkMetaData` object when retrieving a value that might be encrypted.
Why do we need to call it here?
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -90,6 +102,8 @@
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC =
MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+ public static final String EF_MAGIC_STR = "PARE";
+ public static final byte[] EFMAGIC =
EF_MAGIC_STR.getBytes(Charset.forName("ASCII"));
Review comment:
Just like for `MAGIC`:
```suggestion
public static final byte[] EFMAGIC =
EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
```
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -772,6 +860,11 @@ public void endBlock() throws IOException {
state = state.endBlock();
LOG.debug("{}: end block", out.getPos());
currentBlock.setRowCount(currentRecordCount);
+ int blockSize = blocks.size();
Review comment:
`blockSize` is a bit misleading. It is going to be used as ordinal so
what about `rowGroupOrdinal` or similar?
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -34,6 +47,9 @@
* Column meta data for a block stored in the file footer and passed in the
InputSplit
*/
abstract public class ColumnChunkMetaData {
+
+ protected ColumnPath path;
Review comment:
The column path is already part of `properties` which whole purpose is
to save memory. If you need this for `EncryptedColumnChunkMetaData`, add it
there instead.
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path root = new Path("target/tests/TestEncryption/");
+ enforceEmptyDir(conf, root);
+
+ Random random = new Random();
+ int numberOfEncryptionModes = 5;
+ FileEncryptionProperties[] encryptionPropertiesList = new
FileEncryptionProperties[numberOfEncryptionModes];
+ FileDecryptionProperties[] decryptionPropertiesList = new
FileDecryptionProperties[numberOfEncryptionModes];
+
+ // #0 Unencrypted - make sure null encryption properties don't break
regular Parquet
+ encryptionPropertiesList[0] = null;
+ decryptionPropertiesList[0] = null;
+
+ // #1 Basic encryption setup
+ byte[] encryptionKey = new byte[16];
+ random.nextBytes(encryptionKey);
+ FileEncryptionProperties encryptionProperties =
FileEncryptionProperties.builder(encryptionKey).build();
+ FileDecryptionProperties decryptionProperties =
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+ encryptionPropertiesList[1] = encryptionProperties;
+ decryptionPropertiesList[1] = decryptionProperties;
+
+ // #2 Default algorithm, non-uniform encryption, key metadata, key
retriever, AAD prefix
+ byte[] footerKey = new byte[16];
+ random.nextBytes(footerKey);
+ byte[] columnKey0 = new byte[16];
+ random.nextBytes(columnKey0);
+ byte[] columnKey1 = new byte[16];
+ random.nextBytes(columnKey1);
+ ColumnEncryptionProperties columnProperties0 =
ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ ColumnEncryptionProperties columnProperties1 =
ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new
HashMap<ColumnPath, ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+ keyRetriever.putKey("fk", footerKey);
+ keyRetriever.putKey("ck0", columnKey0);
+ keyRetriever.putKey("ck1", columnKey1);
+ decryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(keyRetriever)
+ .build();
+ encryptionPropertiesList[2] = encryptionProperties;
+ decryptionPropertiesList[2] = decryptionProperties;
+
+ // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key
retriever, AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[3] = encryptionProperties;
+ decryptionPropertiesList[3] = decryptionProperties; // Same decryption
properties
+
+ // #4 Plaintext footer, default algorithm, key metadata, key retriever,
AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withPlaintextFooter()
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[4] = encryptionProperties;
+ decryptionPropertiesList[4] = decryptionProperties; // Same decryption
properties
+
+
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required binary binary_field; "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required boolean boolean_field; "
+ + "required float float_field; "
+ + "required double double_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "required int96 int96_field; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+ for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes;
encryptionMode++) {
+ System.out.println("MODE: "+encryptionMode);
+
+ Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false,
ParquetWriter.DEFAULT_WRITER_VERSION, conf,
+ encryptionPropertiesList[encryptionMode]);
+ for (int i = 0; i < 1000; i++) {
Review comment:
I don't know if make to much sense to write the same data 1000 times.
Most of our tests are working by generating random data in memory (e.g. a
`List<Group>`) the write it and then test whether we can read back the same
data we have in memory.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -165,10 +200,10 @@ protected static boolean positiveLongFitsInAnInt(long
value) {
return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
}
- private final EncodingStats encodingStats;
+ protected EncodingStats encodingStats;
Review comment:
These are accessed only inside the same class so you may keep them
`private`.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
##########
@@ -539,4 +544,13 @@ public OutputCommitter
getOutputCommitter(TaskAttemptContext context)
public synchronized static MemoryManager getMemoryManager() {
return memoryManager;
}
+
+ private FileEncryptionProperties getEncryptionProperties(Configuration
fileHadoopConfig, Path tempFilePath,
Review comment:
I think, `createEncryptionProperties` or similar would be a better
naming.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1095,8 +1272,16 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData
meta) throws IOException
return null;
}
- byte[] bitset = new byte[numBytes];
- f.readFully(bitset);
+ byte[] bitset;
+ if (null == bloomFilterDecryptor) {
+ bitset = new byte[numBytes];
+ f.readFully(bitset);
+ } else {
+ bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+ if (bitset.length != numBytes) {
+ throw new IOException("Wrong length of decrypted bloom filter bitset");
Review comment:
I guess, it should be the specific crypto exception instead.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
##########
@@ -188,7 +189,11 @@ private void checkDeltaByteArrayProblem(FileMetaData meta,
Configuration conf, B
// this is okay if not using DELTA_BYTE_ARRAY with the bug
Set<Encoding> encodings = new HashSet<Encoding>();
for (ColumnChunkMetaData column : block.getColumns()) {
- encodings.addAll(column.getEncodings());
+ try {
+ encodings.addAll(column.getEncodings());
+ } catch (KeyAccessDeniedException e) {
Review comment:
It is not clear to me why we need to suppress this exception.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -241,6 +280,8 @@ public PrimitiveType getPrimitiveType() {
* @return the stats for this column
*/
abstract public Statistics getStatistics();
+
+ abstract public void decryptIfNeededed();
Review comment:
The design seems to require this method to be only invoked inside this
package. So please, make it _package private_.
You may have an empty implementation here so you don't need to add the empty
implementations in the Int/Long classes.
Also, a bit too many "_ed_"s:
```suggestion
abstract public void decryptIfNeeded();
```
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path root = new Path("target/tests/TestEncryption/");
Review comment:
I would suggest creating a temporary dir in temporary space instead of
having a relative path depending on the current directory. This way it is a bit
error prone.
(There are a couple of ways to create and cleanup temporary directories in
junit.)
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
##########
@@ -0,0 +1,313 @@
+/*
Review comment:
I guess, this test and the one for column indexes were created by
copy-pasting the original tests and adding the encryption. Since we already use
parameterized testing in both I would suggest keeping the original tests and
adding another dimension for _plain_/_encrypted_ or similar. This way we would
have 4 runs for each tests: (V1 with plain), (V1 with encryption), (V2 with
plain) and (V2 with encryption).
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
##########
@@ -194,6 +195,23 @@ public ParquetWriter(
enableDictionary, validating, writerVersion, conf);
}
+ @Deprecated
+ public ParquetWriter(
Review comment:
Similarly to another one of my comments. Instead of adding more
deprecated methods/constructors, try to use the non-deprecated once in the
code. If not used internally, we should not introduce these.
##########
File path:
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -141,11 +159,28 @@ public static ColumnChunkMetaData get(
totalUncompressedSize);
}
}
+
+ public static ColumnChunkMetaData
getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter,
ColumnPath path,
Review comment:
I would expect some comments here why we need this object (to decrypt
this metadata lazily instead of simply decrypt it at reading).
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path root = new Path("target/tests/TestEncryption/");
+ enforceEmptyDir(conf, root);
+
+ Random random = new Random();
+ int numberOfEncryptionModes = 5;
+ FileEncryptionProperties[] encryptionPropertiesList = new
FileEncryptionProperties[numberOfEncryptionModes];
+ FileDecryptionProperties[] decryptionPropertiesList = new
FileDecryptionProperties[numberOfEncryptionModes];
+
+ // #0 Unencrypted - make sure null encryption properties don't break
regular Parquet
+ encryptionPropertiesList[0] = null;
+ decryptionPropertiesList[0] = null;
+
+ // #1 Basic encryption setup
+ byte[] encryptionKey = new byte[16];
+ random.nextBytes(encryptionKey);
+ FileEncryptionProperties encryptionProperties =
FileEncryptionProperties.builder(encryptionKey).build();
+ FileDecryptionProperties decryptionProperties =
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+ encryptionPropertiesList[1] = encryptionProperties;
+ decryptionPropertiesList[1] = decryptionProperties;
+
+ // #2 Default algorithm, non-uniform encryption, key metadata, key
retriever, AAD prefix
+ byte[] footerKey = new byte[16];
+ random.nextBytes(footerKey);
+ byte[] columnKey0 = new byte[16];
+ random.nextBytes(columnKey0);
+ byte[] columnKey1 = new byte[16];
+ random.nextBytes(columnKey1);
+ ColumnEncryptionProperties columnProperties0 =
ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ ColumnEncryptionProperties columnProperties1 =
ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new
HashMap<ColumnPath, ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+ keyRetriever.putKey("fk", footerKey);
+ keyRetriever.putKey("ck0", columnKey0);
+ keyRetriever.putKey("ck1", columnKey1);
+ decryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(keyRetriever)
+ .build();
+ encryptionPropertiesList[2] = encryptionProperties;
+ decryptionPropertiesList[2] = decryptionProperties;
+
+ // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key
retriever, AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[3] = encryptionProperties;
+ decryptionPropertiesList[3] = decryptionProperties; // Same decryption
properties
+
+ // #4 Plaintext footer, default algorithm, key metadata, key retriever,
AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withPlaintextFooter()
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[4] = encryptionProperties;
+ decryptionPropertiesList[4] = decryptionProperties; // Same decryption
properties
+
+
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required binary binary_field; "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required boolean boolean_field; "
+ + "required float float_field; "
+ + "required double double_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "required int96 int96_field; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+ for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes;
encryptionMode++) {
+ System.out.println("MODE: "+encryptionMode);
+
+ Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false,
ParquetWriter.DEFAULT_WRITER_VERSION, conf,
+ encryptionPropertiesList[encryptionMode]);
+ for (int i = 0; i < 1000; i++) {
+ writer.write(
+ f.newGroup()
+ .append("binary_field", "test" + i)
+ .append("int32_field", 32)
+ .append("int64_field", 64l)
+ .append("boolean_field", true)
+ .append("float_field", 1.0f)
+ .append("double_field", 2.0d)
+ .append("flba_field", "foo")
+ .append("int96_field", Binary.fromConstantByteArray(new
byte[12])));
+ }
+ writer.close();
+
+ FileDecryptionProperties fileDecryptionProperties =
decryptionPropertiesList[encryptionMode];
+ ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), file)
+ .withDecryption(fileDecryptionProperties).withConf(conf).build();
+ for (int i = 0; i < 1000; i++) {
+ Group group = null;
+ group= reader.read();
+ assertEquals("test" + i, group.getBinary("binary_field",
0).toStringUsingUTF8());
+ assertEquals(32, group.getInteger("int32_field", 0));
+ assertEquals(64l, group.getLong("int64_field", 0));
+ assertEquals(true, group.getBoolean("boolean_field", 0));
+ assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+ assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+ assertEquals("foo", group.getBinary("flba_field",
0).toStringUsingUTF8());
+ assertEquals(Binary.fromConstantByteArray(new byte[12]),
+ group.getInt96("int96_field",0));
+ }
+ reader.close();
+ }
+ enforceEmptyDir(conf, root);
+ }
+
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
Review comment:
Yep, that's one good way I was talking about but you should use `temp`
instead of hardcoding the directory at the beginning.
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ Path root = new Path("target/tests/TestEncryption/");
+ enforceEmptyDir(conf, root);
+
+ Random random = new Random();
+ int numberOfEncryptionModes = 5;
+ FileEncryptionProperties[] encryptionPropertiesList = new
FileEncryptionProperties[numberOfEncryptionModes];
+ FileDecryptionProperties[] decryptionPropertiesList = new
FileDecryptionProperties[numberOfEncryptionModes];
+
+ // #0 Unencrypted - make sure null encryption properties don't break
regular Parquet
+ encryptionPropertiesList[0] = null;
+ decryptionPropertiesList[0] = null;
+
+ // #1 Basic encryption setup
+ byte[] encryptionKey = new byte[16];
+ random.nextBytes(encryptionKey);
+ FileEncryptionProperties encryptionProperties =
FileEncryptionProperties.builder(encryptionKey).build();
+ FileDecryptionProperties decryptionProperties =
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+ encryptionPropertiesList[1] = encryptionProperties;
+ decryptionPropertiesList[1] = decryptionProperties;
+
+ // #2 Default algorithm, non-uniform encryption, key metadata, key
retriever, AAD prefix
+ byte[] footerKey = new byte[16];
+ random.nextBytes(footerKey);
+ byte[] columnKey0 = new byte[16];
+ random.nextBytes(columnKey0);
+ byte[] columnKey1 = new byte[16];
+ random.nextBytes(columnKey1);
+ ColumnEncryptionProperties columnProperties0 =
ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ ColumnEncryptionProperties columnProperties1 =
ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new
HashMap<ColumnPath, ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+ keyRetriever.putKey("fk", footerKey);
+ keyRetriever.putKey("ck0", columnKey0);
+ keyRetriever.putKey("ck1", columnKey1);
+ decryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(keyRetriever)
+ .build();
+ encryptionPropertiesList[2] = encryptionProperties;
+ decryptionPropertiesList[2] = decryptionProperties;
+
+ // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key
retriever, AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withFooterKeyID("fk")
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[3] = encryptionProperties;
+ decryptionPropertiesList[3] = decryptionProperties; // Same decryption
properties
+
+ // #4 Plaintext footer, default algorithm, key metadata, key retriever,
AAD
+ columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+ .withKey(columnKey0)
+ .withKeyID("ck0")
+ .build();
+ columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+ .withKey(columnKey1)
+ .withKeyID("ck1")
+ .build();
+ columnPropertiesMap = new HashMap<ColumnPath,
ColumnEncryptionProperties>();
+ columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ encryptionProperties = FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyID("fk")
+ .withPlaintextFooter()
+ .withAADPrefix(AADPrefix)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ encryptionPropertiesList[4] = encryptionProperties;
+ decryptionPropertiesList[4] = decryptionProperties; // Same decryption
properties
+
+
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required binary binary_field; "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required boolean boolean_field; "
+ + "required float float_field; "
+ + "required double double_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "required int96 int96_field; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+ for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes;
encryptionMode++) {
+ System.out.println("MODE: "+encryptionMode);
+
+ Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false,
ParquetWriter.DEFAULT_WRITER_VERSION, conf,
+ encryptionPropertiesList[encryptionMode]);
+ for (int i = 0; i < 1000; i++) {
+ writer.write(
+ f.newGroup()
+ .append("binary_field", "test" + i)
+ .append("int32_field", 32)
+ .append("int64_field", 64l)
+ .append("boolean_field", true)
+ .append("float_field", 1.0f)
+ .append("double_field", 2.0d)
+ .append("flba_field", "foo")
+ .append("int96_field", Binary.fromConstantByteArray(new
byte[12])));
+ }
+ writer.close();
Review comment:
Consider using try-with-resources. That construct would be less error
prone (e.g. closing the writer/reader in case of an exception occurs).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]