gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428562074



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1131,7 +1329,19 @@ public OffsetIndex readOffsetIndex(ColumnChunkMetaData 
column) throws IOExceptio
       return null;
     }
     f.seek(ref.getOffset());
-    return 
ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+
+    column.decryptIfNeededed();

Review comment:
       Same as above.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -1035,20 +1154,89 @@ private static void serializeBloomFilters(
 
         long offset = out.getPos();
         column.setBloomFilterOffset(offset);
-        
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out);
-        bloomFilter.writeTo(out);
+        
+        BlockCipher.Encryptor bloomFilterEncryptor = null;
+        byte[] bloomFilterHeaderAAD = null;
+        byte[] bloomFilterBitsetAAD = null;
+        if (null != fileEncryptor) {
+          InternalColumnEncryptionSetup columnEncryptionSetup = 
fileEncryptor.getColumnSetup(column.getPath(), false, (short) cIndex);
+          if (columnEncryptionSetup.isEncrypted()) {
+            bloomFilterEncryptor = 
columnEncryptionSetup.getMetaDataEncryptor();
+            short columnOrdinal = columnEncryptionSetup.getOrdinal();
+            bloomFilterHeaderAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterHeader, 
+                block.getOrdinal(), columnOrdinal, (short)-1);
+            bloomFilterBitsetAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterBitset, 
+                block.getOrdinal(), columnOrdinal, (short)-1);
+          }
+        }
+        
+        
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out, 
+            bloomFilterEncryptor, bloomFilterHeaderAAD);
+        
+        ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+        bloomFilter.writeTo(tempOutStream);
+        byte[] serializedBitset = tempOutStream.toByteArray();
+        if (null != bloomFilterEncryptor) {
+          serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, 
bloomFilterBitsetAAD);
+        }
+        out.write(serializedBitset);
       }
     }
   }
-
-  private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out) throws IOException {
-    long footerIndex = out.getPos();
+  
+  private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out,
+      InternalFileEncryptor fileEncryptor) throws IOException {
+    
     ParquetMetadataConverter metadataConverter = new 
ParquetMetadataConverter();
-    org.apache.parquet.format.FileMetaData parquetMetadata = 
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
-    writeFileMetaData(parquetMetadata, out);
-    LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - 
footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
-    out.write(MAGIC);
+    
+    // Unencrypted file
+    if (null == fileEncryptor) {
+      long footerIndex = out.getPos();
+      org.apache.parquet.format.FileMetaData parquetMetadata = 
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+      writeFileMetaData(parquetMetadata, out);
+      LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - 
footerIndex));
+      BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+      out.write(MAGIC);
+      return;
+    }
+    
+    org.apache.parquet.format.FileMetaData parquetMetadata =
+        metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, 
fileEncryptor);
+    
+    // Encrypted file with plaintext footer 
+    if (!fileEncryptor.isFooterEncrypted()) {

Review comment:
       I don't know why we think that the footer length is an important 
information but this is the only case where we do not log it. We might want to 
add it here as well.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1071,12 +1229,31 @@ public BloomFilterReader 
getBloomFilterDataReader(BlockMetaData block) {
    */
   public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws 
IOException {
     long bloomFilterOffset = meta.getBloomFilterOffset();
+
+    if (0 == bloomFilterOffset) { // TODO Junjie - is there a better way to 
handle this?

Review comment:
       @chenjunjiedada, could you please check this?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1114,7 +1299,20 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData 
column) throws IOExceptio
       return null;
     }
     f.seek(ref.getOffset());
-    return 
ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), 
Util.readColumnIndex(f));
+
+    column.decryptIfNeededed();

Review comment:
       My understanding about this method is that it is invoked inside the 
`ColumnChunkMetaData` object when retrieving a value that might be encrypted. 
Why do we need to call it here?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -90,6 +102,8 @@
   public static final String PARQUET_METADATA_FILE = "_metadata";
   public static final String MAGIC_STR = "PAR1";
   public static final byte[] MAGIC = 
MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+  public static final String EF_MAGIC_STR = "PARE";
+  public static final byte[] EFMAGIC = 
EF_MAGIC_STR.getBytes(Charset.forName("ASCII"));

Review comment:
       Just like for `MAGIC`:
   ```suggestion
     public static final byte[] EFMAGIC = 
EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -772,6 +860,11 @@ public void endBlock() throws IOException {
     state = state.endBlock();
     LOG.debug("{}: end block", out.getPos());
     currentBlock.setRowCount(currentRecordCount);
+    int blockSize = blocks.size();

Review comment:
       `blockSize` is a bit misleading. It is going to be used as ordinal so 
what about `rowGroupOrdinal` or similar?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -34,6 +47,9 @@
  * Column meta data for a block stored in the file footer and passed in the 
InputSplit
  */
 abstract public class ColumnChunkMetaData {
+  
+  protected ColumnPath path;

Review comment:
       The column path is already part of `properties` which whole purpose is 
to save memory. If you need this for `EncryptedColumnChunkMetaData`, add it 
there instead.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestEncryption/");
+    enforceEmptyDir(conf, root);
+
+    Random random = new Random();
+    int numberOfEncryptionModes = 5;
+    FileEncryptionProperties[] encryptionPropertiesList = new 
FileEncryptionProperties[numberOfEncryptionModes];
+    FileDecryptionProperties[] decryptionPropertiesList = new 
FileDecryptionProperties[numberOfEncryptionModes];
+
+    // #0 Unencrypted - make sure null encryption properties don't break 
regular Parquet
+    encryptionPropertiesList[0] = null;
+    decryptionPropertiesList[0] = null;
+
+    // #1 Basic encryption setup
+    byte[] encryptionKey = new byte[16];
+    random.nextBytes(encryptionKey);
+    FileEncryptionProperties encryptionProperties = 
FileEncryptionProperties.builder(encryptionKey).build();
+    FileDecryptionProperties decryptionProperties = 
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+    encryptionPropertiesList[1] = encryptionProperties;
+    decryptionPropertiesList[1] = decryptionProperties;
+
+    // #2 Default algorithm, non-uniform encryption, key metadata, key 
retriever, AAD prefix
+    byte[] footerKey = new byte[16];
+    random.nextBytes(footerKey);
+    byte[] columnKey0 = new byte[16];
+    random.nextBytes(columnKey0);
+    byte[] columnKey1 = new byte[16];
+    random.nextBytes(columnKey1);
+    ColumnEncryptionProperties columnProperties0 = 
ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    ColumnEncryptionProperties columnProperties1 = 
ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new 
HashMap<ColumnPath, ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+    keyRetriever.putKey("fk", footerKey);
+    keyRetriever.putKey("ck0", columnKey0);
+    keyRetriever.putKey("ck1", columnKey1);
+    decryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(keyRetriever)
+        .build();
+    encryptionPropertiesList[2] = encryptionProperties;
+    decryptionPropertiesList[2] = decryptionProperties;
+
+    // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key 
retriever, AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[3] = encryptionProperties;
+    decryptionPropertiesList[3] = decryptionProperties; // Same decryption 
properties
+
+    // #4  Plaintext footer, default algorithm, key metadata, key retriever, 
AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withPlaintextFooter()
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[4] = encryptionProperties;
+    decryptionPropertiesList[4] = decryptionProperties; // Same decryption 
properties
+
+
+    MessageType schema = parseMessageType(
+        "message test { "
+            + "required binary binary_field; "
+            + "required int32 int32_field; "
+            + "required int64 int64_field; "
+            + "required boolean boolean_field; "
+            + "required float float_field; "
+            + "required double double_field; "
+            + "required fixed_len_byte_array(3) flba_field; "
+            + "required int96 int96_field; "
+            + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes; 
encryptionMode++) {
+      System.out.println("MODE: "+encryptionMode);
+      
+      Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+      ParquetWriter<Group> writer = new ParquetWriter<Group>(
+          file,
+          new GroupWriteSupport(),
+          UNCOMPRESSED, 1024, 1024, 512, true, false, 
ParquetWriter.DEFAULT_WRITER_VERSION, conf, 
+          encryptionPropertiesList[encryptionMode]);
+      for (int i = 0; i < 1000; i++) {

Review comment:
       I don't know if make to much sense to write the same data 1000 times. 
Most of our tests are working by generating random data in memory (e.g. a 
`List<Group>`) the write it and then test whether we can read back the same 
data we have in memory.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -165,10 +200,10 @@ protected static boolean positiveLongFitsInAnInt(long 
value) {
     return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
   }
 
-  private final EncodingStats encodingStats;
+  protected EncodingStats encodingStats;

Review comment:
       These are accessed only inside the same class so you may keep them 
`private`.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
##########
@@ -539,4 +544,13 @@ public OutputCommitter 
getOutputCommitter(TaskAttemptContext context)
   public synchronized static MemoryManager getMemoryManager() {
     return memoryManager;
   }
+  
+  private FileEncryptionProperties getEncryptionProperties(Configuration 
fileHadoopConfig, Path tempFilePath, 

Review comment:
       I think, `createEncryptionProperties` or similar would be a better 
naming.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1095,8 +1272,16 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData 
meta) throws IOException
       return null;
     }
 
-    byte[] bitset = new byte[numBytes];
-    f.readFully(bitset);
+    byte[] bitset;
+    if (null == bloomFilterDecryptor) {
+      bitset = new byte[numBytes];
+      f.readFully(bitset);
+    } else {
+      bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+      if (bitset.length != numBytes) {
+        throw new IOException("Wrong length of decrypted bloom filter bitset");

Review comment:
       I guess, it should be the specific crypto exception instead.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
##########
@@ -188,7 +189,11 @@ private void checkDeltaByteArrayProblem(FileMetaData meta, 
Configuration conf, B
       // this is okay if not using DELTA_BYTE_ARRAY with the bug
       Set<Encoding> encodings = new HashSet<Encoding>();
       for (ColumnChunkMetaData column : block.getColumns()) {
-        encodings.addAll(column.getEncodings());
+        try {
+          encodings.addAll(column.getEncodings());
+        } catch (KeyAccessDeniedException e) {

Review comment:
       It is not clear to me why we need to suppress this exception.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -241,6 +280,8 @@ public PrimitiveType getPrimitiveType() {
    * @return the stats for this column
    */
   abstract public Statistics getStatistics();
+  
+  abstract public void decryptIfNeededed();

Review comment:
       The design seems to require this method to be only invoked inside this 
package. So please, make it _package private_.
   
   You may have an empty implementation here so you don't need to add the empty 
implementations in the Int/Long classes.
   
   Also, a bit too many "_ed_"s:
   ```suggestion
     abstract public void decryptIfNeeded();
   ```

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestEncryption/");

Review comment:
       I would suggest creating a temporary dir in temporary space instead of 
having a relative path depending on the current directory. This way it is a bit 
error prone.
   (There are a couple of ways to create and cleanup temporary directories in 
junit.)

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
##########
@@ -0,0 +1,313 @@
+/*

Review comment:
       I guess, this test and the one for column indexes were created by 
copy-pasting the original tests and adding the encryption. Since we already use 
parameterized testing in both I would suggest keeping the original tests and 
adding another dimension for _plain_/_encrypted_ or similar. This way we would 
have 4 runs for each tests: (V1 with plain), (V1 with encryption), (V2 with 
plain) and (V2 with encryption).

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
##########
@@ -194,6 +195,23 @@ public ParquetWriter(
         enableDictionary, validating, writerVersion, conf);
   }
 
+  @Deprecated
+  public ParquetWriter(

Review comment:
       Similarly to another one of my comments. Instead of adding more 
deprecated methods/constructors, try to use the non-deprecated once in the 
code. If not used internally, we should not introduce these.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
##########
@@ -141,11 +159,28 @@ public static ColumnChunkMetaData get(
           totalUncompressedSize);
     }
   }
+  
+  public static ColumnChunkMetaData 
getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, 
ColumnPath path, 

Review comment:
       I would expect some comments here why we need this object (to decrypt 
this metadata lazily instead of simply decrypt it at reading).

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestEncryption/");
+    enforceEmptyDir(conf, root);
+
+    Random random = new Random();
+    int numberOfEncryptionModes = 5;
+    FileEncryptionProperties[] encryptionPropertiesList = new 
FileEncryptionProperties[numberOfEncryptionModes];
+    FileDecryptionProperties[] decryptionPropertiesList = new 
FileDecryptionProperties[numberOfEncryptionModes];
+
+    // #0 Unencrypted - make sure null encryption properties don't break 
regular Parquet
+    encryptionPropertiesList[0] = null;
+    decryptionPropertiesList[0] = null;
+
+    // #1 Basic encryption setup
+    byte[] encryptionKey = new byte[16];
+    random.nextBytes(encryptionKey);
+    FileEncryptionProperties encryptionProperties = 
FileEncryptionProperties.builder(encryptionKey).build();
+    FileDecryptionProperties decryptionProperties = 
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+    encryptionPropertiesList[1] = encryptionProperties;
+    decryptionPropertiesList[1] = decryptionProperties;
+
+    // #2 Default algorithm, non-uniform encryption, key metadata, key 
retriever, AAD prefix
+    byte[] footerKey = new byte[16];
+    random.nextBytes(footerKey);
+    byte[] columnKey0 = new byte[16];
+    random.nextBytes(columnKey0);
+    byte[] columnKey1 = new byte[16];
+    random.nextBytes(columnKey1);
+    ColumnEncryptionProperties columnProperties0 = 
ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    ColumnEncryptionProperties columnProperties1 = 
ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new 
HashMap<ColumnPath, ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+    keyRetriever.putKey("fk", footerKey);
+    keyRetriever.putKey("ck0", columnKey0);
+    keyRetriever.putKey("ck1", columnKey1);
+    decryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(keyRetriever)
+        .build();
+    encryptionPropertiesList[2] = encryptionProperties;
+    decryptionPropertiesList[2] = decryptionProperties;
+
+    // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key 
retriever, AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[3] = encryptionProperties;
+    decryptionPropertiesList[3] = decryptionProperties; // Same decryption 
properties
+
+    // #4  Plaintext footer, default algorithm, key metadata, key retriever, 
AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withPlaintextFooter()
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[4] = encryptionProperties;
+    decryptionPropertiesList[4] = decryptionProperties; // Same decryption 
properties
+
+
+    MessageType schema = parseMessageType(
+        "message test { "
+            + "required binary binary_field; "
+            + "required int32 int32_field; "
+            + "required int64 int64_field; "
+            + "required boolean boolean_field; "
+            + "required float float_field; "
+            + "required double double_field; "
+            + "required fixed_len_byte_array(3) flba_field; "
+            + "required int96 int96_field; "
+            + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes; 
encryptionMode++) {
+      System.out.println("MODE: "+encryptionMode);
+      
+      Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+      ParquetWriter<Group> writer = new ParquetWriter<Group>(
+          file,
+          new GroupWriteSupport(),
+          UNCOMPRESSED, 1024, 1024, 512, true, false, 
ParquetWriter.DEFAULT_WRITER_VERSION, conf, 
+          encryptionPropertiesList[encryptionMode]);
+      for (int i = 0; i < 1000; i++) {
+        writer.write(
+            f.newGroup()
+            .append("binary_field", "test" + i)
+            .append("int32_field", 32)
+            .append("int64_field", 64l)
+            .append("boolean_field", true)
+            .append("float_field", 1.0f)
+            .append("double_field", 2.0d)
+            .append("flba_field", "foo")
+            .append("int96_field", Binary.fromConstantByteArray(new 
byte[12])));
+      }
+      writer.close();
+
+      FileDecryptionProperties fileDecryptionProperties = 
decryptionPropertiesList[encryptionMode];
+      ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), file)
+          .withDecryption(fileDecryptionProperties).withConf(conf).build();
+      for (int i = 0; i < 1000; i++) {
+        Group group = null;
+        group= reader.read();
+        assertEquals("test" + i, group.getBinary("binary_field", 
0).toStringUsingUTF8());
+        assertEquals(32, group.getInteger("int32_field", 0));
+        assertEquals(64l, group.getLong("int64_field", 0));
+        assertEquals(true, group.getBoolean("boolean_field", 0));
+        assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+        assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+        assertEquals("foo", group.getBinary("flba_field", 
0).toStringUsingUTF8());
+        assertEquals(Binary.fromConstantByteArray(new byte[12]),
+            group.getInt96("int96_field",0));
+      }
+      reader.close();
+    }
+    enforceEmptyDir(conf, root);
+  }
+
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();

Review comment:
       Yep, that's one good way I was talking about but you should use `temp` 
instead of hardcoding the directory at the beginning.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryption.java
##########
@@ -0,0 +1,215 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.StringKeyIdRetriever;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+
+
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.rules.TemporaryFolder;
+
+public class TestEncryption {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestEncryption/");
+    enforceEmptyDir(conf, root);
+
+    Random random = new Random();
+    int numberOfEncryptionModes = 5;
+    FileEncryptionProperties[] encryptionPropertiesList = new 
FileEncryptionProperties[numberOfEncryptionModes];
+    FileDecryptionProperties[] decryptionPropertiesList = new 
FileDecryptionProperties[numberOfEncryptionModes];
+
+    // #0 Unencrypted - make sure null encryption properties don't break 
regular Parquet
+    encryptionPropertiesList[0] = null;
+    decryptionPropertiesList[0] = null;
+
+    // #1 Basic encryption setup
+    byte[] encryptionKey = new byte[16];
+    random.nextBytes(encryptionKey);
+    FileEncryptionProperties encryptionProperties = 
FileEncryptionProperties.builder(encryptionKey).build();
+    FileDecryptionProperties decryptionProperties = 
FileDecryptionProperties.builder().withFooterKey(encryptionKey).build();
+    encryptionPropertiesList[1] = encryptionProperties;
+    decryptionPropertiesList[1] = decryptionProperties;
+
+    // #2 Default algorithm, non-uniform encryption, key metadata, key 
retriever, AAD prefix
+    byte[] footerKey = new byte[16];
+    random.nextBytes(footerKey);
+    byte[] columnKey0 = new byte[16];
+    random.nextBytes(columnKey0);
+    byte[] columnKey1 = new byte[16];
+    random.nextBytes(columnKey1);
+    ColumnEncryptionProperties columnProperties0 = 
ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    ColumnEncryptionProperties columnProperties1 = 
ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    HashMap<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new 
HashMap<ColumnPath, ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    byte[] AADPrefix = root.getName().getBytes(StandardCharsets.UTF_8);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    StringKeyIdRetriever keyRetriever = new StringKeyIdRetriever();
+    keyRetriever.putKey("fk", footerKey);
+    keyRetriever.putKey("ck0", columnKey0);
+    keyRetriever.putKey("ck1", columnKey1);
+    decryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(keyRetriever)
+        .build();
+    encryptionPropertiesList[2] = encryptionProperties;
+    decryptionPropertiesList[2] = decryptionProperties;
+
+    // #3 GCM_CTR algorithm, non-uniform encryption, key metadata, key 
retriever, AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+        .withFooterKeyID("fk")
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[3] = encryptionProperties;
+    decryptionPropertiesList[3] = decryptionProperties; // Same decryption 
properties
+
+    // #4  Plaintext footer, default algorithm, key metadata, key retriever, 
AAD
+    columnProperties0 = ColumnEncryptionProperties.builder("binary_field")
+        .withKey(columnKey0)
+        .withKeyID("ck0")
+        .build();
+    columnProperties1 = ColumnEncryptionProperties.builder("int32_field")
+        .withKey(columnKey1)
+        .withKeyID("ck1")
+        .build();
+    columnPropertiesMap = new HashMap<ColumnPath, 
ColumnEncryptionProperties>();
+    columnPropertiesMap.put(columnProperties0.getPath(), columnProperties0);
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    encryptionProperties = FileEncryptionProperties.builder(footerKey)
+        .withFooterKeyID("fk")
+        .withPlaintextFooter()
+        .withAADPrefix(AADPrefix)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    encryptionPropertiesList[4] = encryptionProperties;
+    decryptionPropertiesList[4] = decryptionProperties; // Same decryption 
properties
+
+
+    MessageType schema = parseMessageType(
+        "message test { "
+            + "required binary binary_field; "
+            + "required int32 int32_field; "
+            + "required int64 int64_field; "
+            + "required boolean boolean_field; "
+            + "required float float_field; "
+            + "required double double_field; "
+            + "required fixed_len_byte_array(3) flba_field; "
+            + "required int96 int96_field; "
+            + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    for (int encryptionMode = 0; encryptionMode < numberOfEncryptionModes; 
encryptionMode++) {
+      System.out.println("MODE: "+encryptionMode);
+      
+      Path file = new Path(root, "m_" + encryptionMode + ".parquet.encrypted");
+      ParquetWriter<Group> writer = new ParquetWriter<Group>(
+          file,
+          new GroupWriteSupport(),
+          UNCOMPRESSED, 1024, 1024, 512, true, false, 
ParquetWriter.DEFAULT_WRITER_VERSION, conf, 
+          encryptionPropertiesList[encryptionMode]);
+      for (int i = 0; i < 1000; i++) {
+        writer.write(
+            f.newGroup()
+            .append("binary_field", "test" + i)
+            .append("int32_field", 32)
+            .append("int64_field", 64l)
+            .append("boolean_field", true)
+            .append("float_field", 1.0f)
+            .append("double_field", 2.0d)
+            .append("flba_field", "foo")
+            .append("int96_field", Binary.fromConstantByteArray(new 
byte[12])));
+      }
+      writer.close();

Review comment:
       Consider using try-with-resources. That construct would be less error 
prone (e.g. closing the writer/reader in case of an exception occurs).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to