[ 
https://issues.apache.org/jira/browse/PARQUET-1373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133335#comment-17133335
 ] 

ASF GitHub Bot commented on PARQUET-1373:
-----------------------------------------

gszadovszky commented on a change in pull request #615:
URL: https://github.com/apache/parquet-mr/pull/615#discussion_r435236094



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();

Review comment:
       As per the java naming conventions for constants (`static final`):
   ```suggestion
     private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEK_MAP_PER_TOKEN = new 
ConcurrentHashMap<>();
   
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();
+  private volatile static long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute
+  //A map of KEK_ID to KEK - for the current token
+  private final ConcurrentMap<String,byte[]> KEKPerKekID;

Review comment:
       nit: should start with lower case. Though, the naming conventions for 
acronyms are not clear I would vote on the following.
   ```suggestion
     private final ConcurrentMap<String,byte[]> kekPerKekID;
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class FileKeyWrapper {
+
+  public static final int KEK_LENGTH = 16;
+  public static final int KEK_ID_LENGTH = 16;
+
+  // For every token: a map of MEK_ID to (KEK ID and KEK)
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>>> KEKMapPerToken =
+      new ConcurrentHashMap<>(KeyToolkit.INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute;
+
+  //A map of MEK_ID to (KEK ID and KEK) - for the current token
+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private final long cacheEntryLifetime;
+
+  private final KmsClient kmsClient;
+  private final String kmsInstanceID;
+  private final String kmsInstanceURL;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final SecureRandom random;
+  private final boolean doubleWrapping;
+
+  private short keyCounter;
+  private String accessToken;
+
+  public FileKeyWrapper(Configuration configuration, FileKeyMaterialStore 
keyMaterialStore) {
+    this.hadoopConfiguration = configuration;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME, 
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME); 
+
+    kmsInstanceID = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 
+        KmsClient.DEFAULT_KMS_INSTANCE_ID);
+
+    doubleWrapping =  
hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, true);
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, configuration, 
accessToken, cacheEntryLifetime);
+
+    kmsInstanceURL = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 
+        RemoteKmsClient.DEFAULT_KMS_INSTANCE_URL);
+
+    this.keyMaterialStore = keyMaterialStore;
+
+    random = new SecureRandom();
+    keyCounter = 0;
+
+    // Check caches upon each file writing (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    if (doubleWrapping) {
+      checkKekCacheForExpiredTokens();
+
+      ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>> 
KEKCacheEntry = KEKMapPerToken.get(accessToken);
+      if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+        synchronized (KEKMapPerToken) {
+          KEKCacheEntry = KEKMapPerToken.get(accessToken);
+          if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+            KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, KeyEncryptionKey>(), cacheEntryLifetime);
+            KEKMapPerToken.put(accessToken, KEKCacheEntry);
+          }
+        }
+      }
+      KEKPerMasterKeyID = KEKCacheEntry.getCachedItem();
+    } else {
+      KEKPerMasterKeyID = null;
+    }
+  }
+
+  public byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, 
boolean isFooterKey) {
+    return getEncryptionKeyMetadata(dataKey, masterKeyID, isFooterKey, null);
+  }
+
+  static void removeCacheEntriesForToken(String accessToken) {
+    synchronized(KEKMapPerToken) {
+      KEKMapPerToken.remove(accessToken);
+    }
+  }
+
+  static void removeCacheEntriesForAllTokens() {
+    synchronized (KEKMapPerToken) {
+      KEKMapPerToken.clear();
+    }
+  }
+
+  private void checkKekCacheForExpiredTokens() {
+    long now = System.currentTimeMillis();
+
+    if (now > (lastKekCacheCleanupTimestamp + cacheEntryLifetime)) {
+      synchronized (KEKMapPerToken) {

Review comment:
       You may remove synchronization if you change 
`lastKekCacheCleanupTimestamp` to an `AtomicLong` and use the method 
`compareAndSet` to ensure only one thread executes the cleanup. The cleanup 
itself does not need synchronization as `ConcurrentMap` is thread-safe anyway.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();
+  private volatile static long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute
+  //A map of KEK_ID to KEK - for the current token
+  private final ConcurrentMap<String,byte[]> KEKPerKekID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private KmsClient kmsClient = null;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final long cacheEntryLifetime;
+  private final String accessToken;
+
+  FileKeyUnwrapper(Configuration hadoopConfiguration, FileKeyMaterialStore 
keyStore) {
+    this.hadoopConfiguration = hadoopConfiguration;
+    this.keyMaterialStore = keyStore;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME,
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME);
+
+    // Check cache upon each file reading (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    checkKekCacheForExpiredTokens();
+
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
+        KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    ExpiringCacheEntry<ConcurrentMap<String, byte[]>> KEKCacheEntry = 
KEKMapPerToken.get(accessToken);
+    if (null == KEKCacheEntry || KEKCacheEntry.isExpired()) {
+      synchronized (KEKMapPerToken) {
+        KEKCacheEntry = KEKMapPerToken.get(accessToken);
+        if (null == KEKCacheEntry || KEKCacheEntry.isExpired()) {
+          KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, byte[]>(), cacheEntryLifetime);
+          KEKMapPerToken.put(accessToken, KEKCacheEntry);
+        }
+      }
+    }
+
+    KEKPerKekID = KEKCacheEntry.getCachedItem();
+  }
+
+  @Override
+  public byte[] getKey(byte[] keyMetaData) {
+    String keyMaterial;
+    if (null != keyMaterialStore) {
+      String keyReferenceMetadata = new String(keyMetaData, 
StandardCharsets.UTF_8);
+      String keyIDinFile = getKeyReference(keyReferenceMetadata);
+      keyMaterial = keyMaterialStore.getKeyMaterial(keyIDinFile);
+      if (null == keyMaterial) {
+        throw new ParquetCryptoRuntimeException("Null key material for 
keyIDinFile: " + keyIDinFile);
+      }
+    }  else {
+      keyMaterial = new String(keyMetaData, StandardCharsets.UTF_8);
+    }
+
+    return getDEKandMasterID(keyMaterial).getDataKey();
+  }
+
+  private void checkKekCacheForExpiredTokens() {
+    long now = System.currentTimeMillis();
+
+    if (now > (lastKekCacheCleanupTimestamp + cacheEntryLifetime)) {
+      synchronized (KEKMapPerToken) {
+        if (now > (lastKekCacheCleanupTimestamp + cacheEntryLifetime)) {
+          KeyToolkit.removeExpiredEntriesFromCache(KEKMapPerToken);
+          lastKekCacheCleanupTimestamp = now;
+        }
+      }
+    }
+  }
+
+  KeyWithMasterID getDEKandMasterID(String keyMaterial)  {
+    Map<String, String> keyMaterialJson = null;

Review comment:
       I think, it would be more readable and less error prone if you would be 
able to deserialize the json into a specific java object with the required 
fields instead of a map. This way you do not have to use the field constants 
from `KeyTookit` and can handle the potential issues (e.g. missing/incorrect 
values) at one place.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {
+  
+  public final static String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";
+  public static final String TEMP_FILE_PREFIX = "_TMP";
+  public final static String KEY_MATERIAL_FILE_SUFFFIX = ".json";
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private FileSystem hadoopFileSystem;
+  private Map<String, String> keyMaterialMap;
+  private Path keyMaterialFile;
+  
+  HadoopFSKeyMaterialStore(FileSystem hadoopFileSystem) {
+    this.hadoopFileSystem = hadoopFileSystem;
+  }
+
+  @Override
+  public void initialize(Path parquetFilePath, Configuration hadoopConfig, 
boolean tempStore) {
+    String fullPrefix = (tempStore? TEMP_FILE_PREFIX : "");
+    fullPrefix += KEY_MATERIAL_FILE_PREFIX;
+    keyMaterialFile = new Path(parquetFilePath.getParent(),
+      fullPrefix + parquetFilePath.getName() + KEY_MATERIAL_FILE_SUFFFIX);
+  }
+
+  @Override
+  public void addKeyMaterial(String keyIDInFile, String keyMaterial) throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      keyMaterialMap = new HashMap<>();
+    }
+    keyMaterialMap.put(keyIDInFile, keyMaterial);
+  }
+
+  @Override
+  public String getKeyMaterial(String keyIDInFile)  throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      loadKeyMaterialMap();
+    }
+    return keyMaterialMap.get(keyIDInFile);
+  }
+  
+  private void loadKeyMaterialMap() {
+    try (FSDataInputStream keyMaterialStream = 
hadoopFileSystem.open(keyMaterialFile)) {
+      JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);
+      keyMaterialMap = objectMapper.readValue(keyMaterialJson,
+        new TypeReference<Map<String, String>>() { });
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to get key material from 
" + keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public void saveMaterial() throws ParquetCryptoRuntimeException {
+    // TODO needed? Path qualifiedPath = 
parquetFilePath.makeQualified(hadoopFileSystem);
+    try (FSDataOutputStream keyMaterialStream = 
hadoopFileSystem.create(keyMaterialFile)) {
+      objectMapper.writeValue(keyMaterialStream, keyMaterialMap);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to save key material in 
" + keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public Set<String> getKeyIDSet() throws ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      loadKeyMaterialMap();
+    }
+
+    return keyMaterialMap.keySet();

Review comment:
       This way you expose the map itself: the caller can remove elements from 
the map. You may wrap the set by using `Collections.unmodifiableSet`.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();

Review comment:
       I am not sure I completely get why it is necessary to cache these 
statically. It would need some more description.
   
   Anyway, if it is what we want I think it can be designed in a more readable 
and understandable way. I would have this map inside a separate singleton class 
with all the methods needed to access it (synchronized if required). This way 
you only need to have one static instance, all the others can be simple 
instance methods.
   Another tip: `ConcurrentMap` is created have a map that is thread-safe but 
does not have synchronization. You may either implement the required 
functionality using the methods of `ConcurrentMap` or if it is not possible, 
you may simply use a `HashMap` inside your class and synchronize your methods 
that access it.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class PropertiesDrivenCryptoFactory implements 
EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+  public static final String COLUMN_KEYS_PROPERTY_NAME = 
"encryption.column.keys";
+  public static final String FOOTER_KEY_PROPERTY_NAME = 
"encryption.footer.key";
+  public static final String ENCRYPTION_ALGORITHM_PROPERTY_NAME = 
"encryption.algorithm";
+  public static final String PLAINTEXT_FOOTER_PROPERTY_NAME = 
"encryption.plaintext.footer";
+  
+  public static final int DEK_LENGTH = 16;
+
+  private static final SecureRandom random = new SecureRandom();
+
+  @Override
+  public FileEncryptionProperties getFileEncryptionProperties(Configuration 
fileHadoopConfig, Path tempFilePath,
+      WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+
+    String footerKeyId = 
fileHadoopConfig.getTrimmed(FOOTER_KEY_PROPERTY_NAME); 
+    String columnKeysStr = 
fileHadoopConfig.getTrimmed(COLUMN_KEYS_PROPERTY_NAME);
+
+    // File shouldn't be encrypted
+    if (stringIsEmpty(footerKeyId) && stringIsEmpty(columnKeysStr)) {
+      return null; 
+    }
+
+    if (stringIsEmpty(footerKeyId)) {
+      throw new ParquetCryptoRuntimeException("Undefined footer key");
+    }
+
+    FileKeyMaterialStore keyMaterialStore = null;
+    boolean keyMaterialInternalStorage = 
fileHadoopConfig.getBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, 
true);
+    if (!keyMaterialInternalStorage) {
+      try {
+        keyMaterialStore = new 
HadoopFSKeyMaterialStore(tempFilePath.getFileSystem(fileHadoopConfig));
+        keyMaterialStore.initialize(tempFilePath, fileHadoopConfig, false);
+      } catch (IOException e) {
+        throw new ParquetCryptoRuntimeException("Failed to get key material 
store", e);
+      }
+    }
+
+    FileKeyWrapper keyWrapper = new FileKeyWrapper(fileHadoopConfig, 
keyMaterialStore);
+
+    String algo = 
fileHadoopConfig.getTrimmed(ENCRYPTION_ALGORITHM_PROPERTY_NAME);
+    ParquetCipher cipher;
+    if (stringIsEmpty(algo)) {
+      cipher = ParquetCipher.AES_GCM_V1;
+    } else {
+      if (algo.equalsIgnoreCase("AES_GCM_V1")) {
+        cipher = ParquetCipher.AES_GCM_V1;
+      } else if (algo.equalsIgnoreCase("AES_GCM_CTR_V1")) {
+        cipher = ParquetCipher.AES_GCM_CTR_V1;
+      }
+      else {
+        throw new ParquetCryptoRuntimeException("Wrong encryption algorithm: " 
+ algo);
+      }

Review comment:
       You may use `ParquetCipher.valueOf(String)` instead. In this case you 
have handle the `IllegalArgumentException` in this case.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();
+  private volatile static long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute

Review comment:
       nit: I think, `l` is not required and is a bit misleading (seems like 
`1`)
   ```suggestion
     private volatile static long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60 * 1000; // grace period of 1 minute
   
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";
+
+  public static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
+  public static final String KEY_MATERIAL_TYPE = "PKMT1";
+  public static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = 
"internalStorage";
+  public static final String KEY_REFERENCE_FIELD = "keyReference";
+  public static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
+
+  public static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
+  public static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
+
+  public static final String MASTER_KEY_ID_FIELD = "masterKeyID";
+  public static final String WRAPPED_DEK_FIELD = "wrappedDEK";
+  public static final String KEK_ID_FIELD = "keyEncryptionKeyID";
+  public static final String WRAPPED_KEK_FIELD = "wrappedKEK";
+
+  public static final String FOOTER_KEY_ID_IN_FILE = "kf";
+  public static final String KEY_ID_IN_FILE_PREFIX = "k";
+
+  public static final long DEFAULT_CACHE_ENTRY_LIFETIME = 10 * 60; // 10 
minutes

Review comment:
       I think, having the time unit in the name would help readability.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.samples;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KmsClient;
+import org.apache.parquet.crypto.keytools.RemoteKmsClient;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class VaultClient extends RemoteKmsClient {

Review comment:
       This is a sample implementation. I would expect some comments about it, 
what is it for, why it is only a sample and highlight it is not for production 
use.
   
   What I don't like is the place where we have it. It will be in our 
production jar like another API that we did not create for production use: 
`Group`. There were and are many misunderstandings around this _example_ data 
model. So, I would suggest adding it to a separate place. I don't have the 
perfect place though. Maybe, under tests or we can create another submodule for 
samples/examples. It might worth a question on dev list if someone would have a 
good idea.
   Also, about an even stronger reason why we should not add this to the 
production: it brings `okhttp3` in. Such libraries will always have security 
issues and I do not want to take this burden for production.
   
   Update:
   I can see there are 3 different maps like this one (here, in 
`FileKeyWrapper` and in `KeyToolkit`) that are handled together. The singleton 
instance may contain all of them and all the functionalities that is required 
to handle them.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class FileKeyUnwrapper implements DecryptionKeyRetriever {
+  // For every token: a map of KEK_ID to KEK bytes
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String,byte[]>>> KEKMapPerToken = new 
ConcurrentHashMap<>();
+  private volatile static long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute
+  //A map of KEK_ID to KEK - for the current token
+  private final ConcurrentMap<String,byte[]> KEKPerKekID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private KmsClient kmsClient = null;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final long cacheEntryLifetime;
+  private final String accessToken;
+
+  FileKeyUnwrapper(Configuration hadoopConfiguration, FileKeyMaterialStore 
keyStore) {
+    this.hadoopConfiguration = hadoopConfiguration;
+    this.keyMaterialStore = keyStore;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME,
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME);
+
+    // Check cache upon each file reading (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    checkKekCacheForExpiredTokens();
+
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
+        KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    ExpiringCacheEntry<ConcurrentMap<String, byte[]>> KEKCacheEntry = 
KEKMapPerToken.get(accessToken);

Review comment:
       nit:
   ```suggestion
       ExpiringCacheEntry<ConcurrentMap<String, byte[]>> kekCacheEntry = 
KEKMapPerToken.get(accessToken);
   ```
   Similarly to any other fields starting with `KEK`.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+

Review comment:
       nit:
   ```suggestion
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class PropertiesDrivenCryptoFactory implements 
EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+  public static final String COLUMN_KEYS_PROPERTY_NAME = 
"encryption.column.keys";
+  public static final String FOOTER_KEY_PROPERTY_NAME = 
"encryption.footer.key";
+  public static final String ENCRYPTION_ALGORITHM_PROPERTY_NAME = 
"encryption.algorithm";
+  public static final String PLAINTEXT_FOOTER_PROPERTY_NAME = 
"encryption.plaintext.footer";
+  
+  public static final int DEK_LENGTH = 16;
+
+  private static final SecureRandom random = new SecureRandom();
+
+  @Override
+  public FileEncryptionProperties getFileEncryptionProperties(Configuration 
fileHadoopConfig, Path tempFilePath,
+      WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+
+    String footerKeyId = 
fileHadoopConfig.getTrimmed(FOOTER_KEY_PROPERTY_NAME); 
+    String columnKeysStr = 
fileHadoopConfig.getTrimmed(COLUMN_KEYS_PROPERTY_NAME);
+
+    // File shouldn't be encrypted
+    if (stringIsEmpty(footerKeyId) && stringIsEmpty(columnKeysStr)) {
+      return null; 
+    }
+
+    if (stringIsEmpty(footerKeyId)) {
+      throw new ParquetCryptoRuntimeException("Undefined footer key");
+    }
+
+    FileKeyMaterialStore keyMaterialStore = null;
+    boolean keyMaterialInternalStorage = 
fileHadoopConfig.getBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, 
true);

Review comment:
       We read/write this property (maybe others as well) at several places and 
use the default value `true` hardcoded in different places. It would be better 
to be centralized and use the default value at one place.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";
+
+  public static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
+  public static final String KEY_MATERIAL_TYPE = "PKMT1";
+  public static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = 
"internalStorage";
+  public static final String KEY_REFERENCE_FIELD = "keyReference";
+  public static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
+
+  public static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
+  public static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
+
+  public static final String MASTER_KEY_ID_FIELD = "masterKeyID";
+  public static final String WRAPPED_DEK_FIELD = "wrappedDEK";
+  public static final String KEK_ID_FIELD = "keyEncryptionKeyID";
+  public static final String WRAPPED_KEK_FIELD = "wrappedKEK";
+
+  public static final String FOOTER_KEY_ID_IN_FILE = "kf";
+  public static final String KEY_ID_IN_FILE_PREFIX = "k";
+
+  public static final long DEFAULT_CACHE_ENTRY_LIFETIME = 10 * 60; // 10 
minutes
+  public static final int INITIAL_PER_TOKEN_CACHE_SIZE = 5;
+
+  // For every token: a map of KMSInstanceId to kmsClient
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KmsClient>>> kmsClientCachePerToken =
+      new ConcurrentHashMap<>(INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKmsCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute
+
+  static class KeyWithMasterID {
+
+    private final byte[] keyBytes;
+    private final String masterID ;
+
+    KeyWithMasterID(byte[] keyBytes, String masterID) {
+      this.keyBytes = keyBytes;
+      this.masterID = masterID;
+    }
+
+    byte[] getDataKey() {
+      return keyBytes;
+    }
+
+    String getMasterID() {
+      return masterID;
+    }
+  }
+
+  static class KeyEncryptionKey {
+    private final byte[] kekBytes;
+    private final byte[] kekID;
+    private final String encodedKEK_ID;
+    private final String encodedWrappedKEK;
+
+    KeyEncryptionKey(byte[] kekBytes, String encodedKEK_ID, byte[] kekID, 
String encodedWrappedKEK) {
+      this.kekBytes = kekBytes;
+      this.kekID = kekID;
+      this.encodedKEK_ID = encodedKEK_ID;
+      this.encodedWrappedKEK = encodedWrappedKEK;
+    }
+
+    byte[] getBytes() {
+      return kekBytes;
+    }
+
+    byte[] getID() {
+      return kekID;
+    }
+
+    String getEncodedID() {
+      return encodedKEK_ID;
+    }
+
+    String getWrappedWithCRK() {
+      return encodedWrappedKEK;
+    }
+  }
+
+  public static void rotateMasterKeys(String folderPath, Configuration 
hadoopConfig)
+      throws IOException, ParquetCryptoRuntimeException, 
KeyAccessDeniedException {
+
+    Path parentPath = new Path(folderPath);
+
+    FileSystem hadoopFileSystem = parentPath.getFileSystem(hadoopConfig);
+
+    FileStatus[] keyMaterialFiles = hadoopFileSystem.listStatus(parentPath, 
HiddenFileFilter.INSTANCE);
+
+    for (FileStatus fs : keyMaterialFiles) {
+      Path parquetFile = fs.getPath();
+
+      FileKeyMaterialStore keyMaterialStore = new 
HadoopFSKeyMaterialStore(hadoopFileSystem);
+      keyMaterialStore.initialize(parquetFile, hadoopConfig, false);
+      FileKeyUnwrapper fileKeyUnwrapper = new FileKeyUnwrapper(hadoopConfig, 
keyMaterialStore);
+
+      FileKeyMaterialStore tempKeyMaterialStore = new 
HadoopFSKeyMaterialStore(hadoopFileSystem);
+      tempKeyMaterialStore.initialize(parquetFile, hadoopConfig, true);
+      FileKeyWrapper fileKeyWrapper = new FileKeyWrapper(hadoopConfig, 
tempKeyMaterialStore);
+
+      Set<String> fileKeyIdSet = keyMaterialStore.getKeyIDSet();
+
+      // Start with footer key (to get KMS ID, URL, if needed) 
+      String keyMaterial = 
keyMaterialStore.getKeyMaterial(FOOTER_KEY_ID_IN_FILE);
+      KeyWithMasterID key = fileKeyUnwrapper.getDEKandMasterID(keyMaterial);
+      fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), 
key.getMasterID(), true, FOOTER_KEY_ID_IN_FILE);
+
+      fileKeyIdSet.remove(FOOTER_KEY_ID_IN_FILE);
+      // Rotate column keys
+      for (String keyIdInFile : fileKeyIdSet) {
+        keyMaterial = keyMaterialStore.getKeyMaterial(keyIdInFile);
+        key = fileKeyUnwrapper.getDEKandMasterID(keyMaterial);
+        fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), 
key.getMasterID(), false, keyIdInFile);
+      }
+
+      tempKeyMaterialStore.saveMaterial();
+
+      keyMaterialStore.removeMaterial();
+
+      tempKeyMaterialStore.moveMaterialTo(keyMaterialStore);
+    }
+
+    removeCacheEntriesForAllTokens();
+  }
+  
+  public static void removeCacheEntriesForAllTokens() {
+    synchronized (kmsClientCachePerToken) {
+      kmsClientCachePerToken.clear();
+    }
+    FileKeyWrapper.removeCacheEntriesForAllTokens();
+    FileKeyUnwrapper.removeCacheEntriesForAllTokens();
+  }
+
+  public static String wrapKeyLocally(byte[] key, byte[] wrappingKey, byte[] 
AAD) {
+    AesGcmEncryptor keyEncryptor;
+
+    keyEncryptor = (AesGcmEncryptor) 
ModuleCipherFactory.getEncryptor(AesMode.GCM, wrappingKey);
+
+    byte[] wrappedKey = keyEncryptor.encrypt(false, key, AAD);
+
+    return Base64.getEncoder().encodeToString(wrappedKey);
+  }
+
+  public static byte[] unwrapKeyLocally(String encodedWrappedKey, byte[] 
wrappingKey, byte[] AAD) {
+    byte[] wrappedKEy = Base64.getDecoder().decode(encodedWrappedKey);
+    AesGcmDecryptor keyDecryptor;
+
+    keyDecryptor = (AesGcmDecryptor) 
ModuleCipherFactory.getDecryptor(AesMode.GCM, wrappingKey);
+
+    return keyDecryptor.decrypt(wrappedKEy, 0, wrappedKEy.length, AAD);
+  }
+
+  /**
+   * Flush any caches that are tied to the (compromised) accessToken
+   * @param accessToken
+   */
+  public static void removeCacheEntriesForToken(String accessToken) {
+    synchronized (kmsClientCachePerToken) {
+      kmsClientCachePerToken.remove(accessToken);
+    }
+
+    FileKeyWrapper.removeCacheEntriesForToken(accessToken);
+
+    FileKeyUnwrapper.removeCacheEntriesForToken(accessToken);
+  }
+
+
+  static void checkKmsCacheForExpiredTokens(long cacheEntryLifetime) {
+    long now = System.currentTimeMillis();
+
+    if (now > (lastKmsCacheCleanupTimestamp + cacheEntryLifetime)) {
+      synchronized (kmsClientCachePerToken) {
+        if (now > (lastKmsCacheCleanupTimestamp + cacheEntryLifetime)) {
+          removeExpiredEntriesFromCache(kmsClientCachePerToken);
+          lastKmsCacheCleanupTimestamp = now;
+        }
+      }
+    }
+  }
+
+  static KmsClient getKmsClient(String kmsInstanceID, Configuration 
configuration, String accessToken, long cacheEntryLifetime) {
+    // Try cache first
+    ExpiringCacheEntry<ConcurrentMap<String, KmsClient>> 
kmsClientCachePerTokenEntry = kmsClientCachePerToken.get(accessToken);
+    if ((null == kmsClientCachePerTokenEntry) || 
kmsClientCachePerTokenEntry.isExpired()) {
+      synchronized (kmsClientCachePerToken) {
+        kmsClientCachePerTokenEntry = kmsClientCachePerToken.get(accessToken);
+        if ((null == kmsClientCachePerTokenEntry) || 
kmsClientCachePerTokenEntry.isExpired()) {
+          ConcurrentMap<String, KmsClient> kmsClientPerToken = new 
ConcurrentHashMap<>();
+          kmsClientCachePerTokenEntry = new 
ExpiringCacheEntry<>(kmsClientPerToken, cacheEntryLifetime);
+          kmsClientCachePerToken.put(accessToken, kmsClientCachePerTokenEntry);
+        }
+      }
+    }
+
+    Map<String, KmsClient> kmsClientPerKmsInstanceCache = 
kmsClientCachePerTokenEntry.getCachedItem();
+    KmsClient kmsClient =
+        kmsClientPerKmsInstanceCache.computeIfAbsent(kmsInstanceID,
+            (k) -> createAndInitKmsClient(kmsInstanceID, configuration, 
accessToken));
+
+    return kmsClient;
+  }
+
+  private static KmsClient createAndInitKmsClient(String kmsInstanceID, 
Configuration configuration, String accessToken) {
+    Class<?> kmsClientClass = null;
+    KmsClient kmsClient = null;
+
+    try {
+      kmsClientClass = ConfigurationUtil.getClassFromConfig(configuration,
+          KMS_CLIENT_CLASS_PROPERTY_NAME, KmsClient.class);
+
+      if (null == kmsClientClass) {
+        throw new ParquetCryptoRuntimeException("Unspecified " + 
KMS_CLIENT_CLASS_PROPERTY_NAME);
+      }
+      kmsClient = (KmsClient)kmsClientClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException | 
BadConfigurationException e) {
+      throw new ParquetCryptoRuntimeException("Could not instantiate KmsClient 
class: "
+          + kmsClientClass, e);
+    }
+
+    kmsClient.initialize(configuration, kmsInstanceID, accessToken);
+
+    return kmsClient;
+  }
+
+  static String formatTokenForLog(String accessToken) {
+    int maxTokenDisplayLength = 5;
+    if (accessToken.length() <= maxTokenDisplayLength) {
+      return accessToken;
+    }
+    return accessToken.substring(accessToken.length() - maxTokenDisplayLength);
+  }
+
+  static boolean stringIsEmpty(String str) {
+    return (null == str) || str.isEmpty();
+  }
+
+  static <E> void removeExpiredEntriesFromCache(Map<String, 
ExpiringCacheEntry<E>> cache) {
+    Set<Map.Entry<String, ExpiringCacheEntry<E>>> cacheEntries = 
cache.entrySet();
+    List<String> expiredKeys = new ArrayList<>(cacheEntries.size());
+    for (Map.Entry<String, ExpiringCacheEntry<E>> cacheEntry : cacheEntries) {
+      if (cacheEntry.getValue().isExpired()) {
+        expiredKeys.add(cacheEntry.getKey());
+      }
+    }
+    cache.keySet().removeAll(expiredKeys);
+  }

Review comment:
       What about:
   ```java
     cache.values().removeIf(cacheEntry -> cacheEntry.isExpired());
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {
+  
+  public final static String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";
+  public static final String TEMP_FILE_PREFIX = "_TMP";
+  public final static String KEY_MATERIAL_FILE_SUFFFIX = ".json";
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private FileSystem hadoopFileSystem;
+  private Map<String, String> keyMaterialMap;
+  private Path keyMaterialFile;
+  
+  HadoopFSKeyMaterialStore(FileSystem hadoopFileSystem) {
+    this.hadoopFileSystem = hadoopFileSystem;
+  }
+
+  @Override
+  public void initialize(Path parquetFilePath, Configuration hadoopConfig, 
boolean tempStore) {
+    String fullPrefix = (tempStore? TEMP_FILE_PREFIX : "");
+    fullPrefix += KEY_MATERIAL_FILE_PREFIX;
+    keyMaterialFile = new Path(parquetFilePath.getParent(),
+      fullPrefix + parquetFilePath.getName() + KEY_MATERIAL_FILE_SUFFFIX);
+  }
+
+  @Override
+  public void addKeyMaterial(String keyIDInFile, String keyMaterial) throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      keyMaterialMap = new HashMap<>();
+    }
+    keyMaterialMap.put(keyIDInFile, keyMaterial);
+  }
+
+  @Override
+  public String getKeyMaterial(String keyIDInFile)  throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      loadKeyMaterialMap();
+    }
+    return keyMaterialMap.get(keyIDInFile);
+  }
+  
+  private void loadKeyMaterialMap() {
+    try (FSDataInputStream keyMaterialStream = 
hadoopFileSystem.open(keyMaterialFile)) {
+      JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);
+      keyMaterialMap = objectMapper.readValue(keyMaterialJson,
+        new TypeReference<Map<String, String>>() { });
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to get key material from 
" + keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public void saveMaterial() throws ParquetCryptoRuntimeException {
+    // TODO needed? Path qualifiedPath = 
parquetFilePath.makeQualified(hadoopFileSystem);

Review comment:
       What do you need to solve this TODO?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";
+
+  public static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
+  public static final String KEY_MATERIAL_TYPE = "PKMT1";
+  public static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = 
"internalStorage";
+  public static final String KEY_REFERENCE_FIELD = "keyReference";
+  public static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
+
+  public static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
+  public static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
+
+  public static final String MASTER_KEY_ID_FIELD = "masterKeyID";
+  public static final String WRAPPED_DEK_FIELD = "wrappedDEK";
+  public static final String KEK_ID_FIELD = "keyEncryptionKeyID";
+  public static final String WRAPPED_KEK_FIELD = "wrappedKEK";
+
+  public static final String FOOTER_KEY_ID_IN_FILE = "kf";
+  public static final String KEY_ID_IN_FILE_PREFIX = "k";
+
+  public static final long DEFAULT_CACHE_ENTRY_LIFETIME = 10 * 60; // 10 
minutes
+  public static final int INITIAL_PER_TOKEN_CACHE_SIZE = 5;
+
+  // For every token: a map of KMSInstanceId to kmsClient
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KmsClient>>> kmsClientCachePerToken =
+      new ConcurrentHashMap<>(INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKmsCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute

Review comment:
       Not a big deal but if you store the timestamp when the cache cleanup 
will be necessary, you won't have to do an addition and a comparison every time 
you check for cleanup only the comparison.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";

Review comment:
       These properties shall be documented in the 
[README.md](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md)
 file.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class FileKeyWrapper {
+
+  public static final int KEK_LENGTH = 16;
+  public static final int KEK_ID_LENGTH = 16;
+
+  // For every token: a map of MEK_ID to (KEK ID and KEK)
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>>> KEKMapPerToken =
+      new ConcurrentHashMap<>(KeyToolkit.INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute;
+
+  //A map of MEK_ID to (KEK ID and KEK) - for the current token
+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private final long cacheEntryLifetime;
+
+  private final KmsClient kmsClient;
+  private final String kmsInstanceID;
+  private final String kmsInstanceURL;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final SecureRandom random;
+  private final boolean doubleWrapping;
+
+  private short keyCounter;
+  private String accessToken;
+
+  public FileKeyWrapper(Configuration configuration, FileKeyMaterialStore 
keyMaterialStore) {
+    this.hadoopConfiguration = configuration;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME, 
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME); 
+
+    kmsInstanceID = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 
+        KmsClient.DEFAULT_KMS_INSTANCE_ID);
+
+    doubleWrapping =  
hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, true);
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, configuration, 
accessToken, cacheEntryLifetime);
+
+    kmsInstanceURL = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 
+        RemoteKmsClient.DEFAULT_KMS_INSTANCE_URL);
+
+    this.keyMaterialStore = keyMaterialStore;
+
+    random = new SecureRandom();
+    keyCounter = 0;
+
+    // Check caches upon each file writing (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    if (doubleWrapping) {
+      checkKekCacheForExpiredTokens();
+
+      ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>> 
KEKCacheEntry = KEKMapPerToken.get(accessToken);
+      if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+        synchronized (KEKMapPerToken) {
+          KEKCacheEntry = KEKMapPerToken.get(accessToken);
+          if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+            KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, KeyEncryptionKey>(), cacheEntryLifetime);
+            KEKMapPerToken.put(accessToken, KEKCacheEntry);
+          }
+        }
+      }
+      KEKPerMasterKeyID = KEKCacheEntry.getCachedItem();
+    } else {
+      KEKPerMasterKeyID = null;
+    }
+  }
+
+  public byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, 
boolean isFooterKey) {
+    return getEncryptionKeyMetadata(dataKey, masterKeyID, isFooterKey, null);
+  }
+
+  static void removeCacheEntriesForToken(String accessToken) {
+    synchronized(KEKMapPerToken) {

Review comment:
       Since `kekMapPerToken` handles concurrency why we need synchronization?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class FileKeyWrapper {
+
+  public static final int KEK_LENGTH = 16;
+  public static final int KEK_ID_LENGTH = 16;
+
+  // For every token: a map of MEK_ID to (KEK ID and KEK)
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>>> KEKMapPerToken =
+      new ConcurrentHashMap<>(KeyToolkit.INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute;
+
+  //A map of MEK_ID to (KEK ID and KEK) - for the current token
+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private final long cacheEntryLifetime;
+
+  private final KmsClient kmsClient;
+  private final String kmsInstanceID;
+  private final String kmsInstanceURL;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final SecureRandom random;
+  private final boolean doubleWrapping;
+
+  private short keyCounter;
+  private String accessToken;
+
+  public FileKeyWrapper(Configuration configuration, FileKeyMaterialStore 
keyMaterialStore) {
+    this.hadoopConfiguration = configuration;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME, 
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME); 
+
+    kmsInstanceID = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 
+        KmsClient.DEFAULT_KMS_INSTANCE_ID);
+
+    doubleWrapping =  
hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, true);
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, configuration, 
accessToken, cacheEntryLifetime);
+
+    kmsInstanceURL = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 
+        RemoteKmsClient.DEFAULT_KMS_INSTANCE_URL);
+
+    this.keyMaterialStore = keyMaterialStore;
+
+    random = new SecureRandom();
+    keyCounter = 0;
+
+    // Check caches upon each file writing (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    if (doubleWrapping) {
+      checkKekCacheForExpiredTokens();
+
+      ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>> 
KEKCacheEntry = KEKMapPerToken.get(accessToken);
+      if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+        synchronized (KEKMapPerToken) {
+          KEKCacheEntry = KEKMapPerToken.get(accessToken);
+          if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+            KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, KeyEncryptionKey>(), cacheEntryLifetime);
+            KEKMapPerToken.put(accessToken, KEKCacheEntry);
+          }
+        }
+      }
+      KEKPerMasterKeyID = KEKCacheEntry.getCachedItem();
+    } else {
+      KEKPerMasterKeyID = null;
+    }
+  }
+
+  public byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, 
boolean isFooterKey) {
+    return getEncryptionKeyMetadata(dataKey, masterKeyID, isFooterKey, null);
+  }
+
+  static void removeCacheEntriesForToken(String accessToken) {
+    synchronized(KEKMapPerToken) {
+      KEKMapPerToken.remove(accessToken);
+    }
+  }
+
+  static void removeCacheEntriesForAllTokens() {
+    synchronized (KEKMapPerToken) {
+      KEKMapPerToken.clear();
+    }
+  }
+
+  private void checkKekCacheForExpiredTokens() {
+    long now = System.currentTimeMillis();
+
+    if (now > (lastKekCacheCleanupTimestamp + cacheEntryLifetime)) {
+      synchronized (KEKMapPerToken) {
+        if (now > (lastKekCacheCleanupTimestamp + cacheEntryLifetime)) {
+          KeyToolkit.removeExpiredEntriesFromCache(KEKMapPerToken);
+          lastKekCacheCleanupTimestamp = now;
+        }
+      }
+    }
+  }
+
+  byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, boolean 
isFooterKey, String keyIdInFile) {
+    if (null == kmsClient) {
+      throw new ParquetCryptoRuntimeException("No KMS client available. See 
previous errors.");
+    }
+
+    KeyEncryptionKey keyEncryptionKey = null;
+    String encodedWrappedDEK = null;
+    if (!doubleWrapping) {
+      encodedWrappedDEK = kmsClient.wrapKey(dataKey, masterKeyID);
+    } else {
+      // Find in cache, or generate KEK for Master Key ID
+      keyEncryptionKey = KEKPerMasterKeyID.computeIfAbsent(masterKeyID,
+          (k) -> createKeyEncryptionKey(masterKeyID));
+
+      // Encrypt DEK with KEK
+      byte[] AAD = keyEncryptionKey.getID();
+      encodedWrappedDEK = KeyToolkit.wrapKeyLocally(dataKey, 
keyEncryptionKey.getBytes(), AAD);
+    }
+
+    // Pack all into key material JSON
+    Map<String, String> keyMaterialMap = new HashMap<String, String>(10);
+    keyMaterialMap.put(KeyToolkit.KEY_MATERIAL_TYPE_FIELD, 
KeyToolkit.KEY_MATERIAL_TYPE);
+    if (isFooterKey) {
+      keyMaterialMap.put(KeyToolkit.KMS_INSTANCE_ID_FIELD, kmsInstanceID);
+      keyMaterialMap.put(KeyToolkit.KMS_INSTANCE_URL_FIELD, kmsInstanceURL);
+    }
+    if (null == keyMaterialStore) {
+      keyMaterialMap.put(KeyToolkit.KEY_MATERIAL_INTERNAL_STORAGE_FIELD, 
"true"); // TODO use/check

Review comment:
       I don't like TODOs in final code but I even don't understand this one. 
Please either remove the TODO or rephrase so it is clear what should be done 
later.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class FileKeyWrapper {
+
+  public static final int KEK_LENGTH = 16;
+  public static final int KEK_ID_LENGTH = 16;
+
+  // For every token: a map of MEK_ID to (KEK ID and KEK)
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>>> KEKMapPerToken =
+      new ConcurrentHashMap<>(KeyToolkit.INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute;
+
+  //A map of MEK_ID to (KEK ID and KEK) - for the current token
+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private final long cacheEntryLifetime;
+
+  private final KmsClient kmsClient;
+  private final String kmsInstanceID;
+  private final String kmsInstanceURL;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final SecureRandom random;
+  private final boolean doubleWrapping;
+
+  private short keyCounter;
+  private String accessToken;
+
+  public FileKeyWrapper(Configuration configuration, FileKeyMaterialStore 
keyMaterialStore) {
+    this.hadoopConfiguration = configuration;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME, 
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME); 
+
+    kmsInstanceID = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 
+        KmsClient.DEFAULT_KMS_INSTANCE_ID);
+
+    doubleWrapping =  
hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, true);
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, configuration, 
accessToken, cacheEntryLifetime);
+
+    kmsInstanceURL = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 
+        RemoteKmsClient.DEFAULT_KMS_INSTANCE_URL);
+
+    this.keyMaterialStore = keyMaterialStore;
+
+    random = new SecureRandom();
+    keyCounter = 0;
+
+    // Check caches upon each file writing (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    if (doubleWrapping) {
+      checkKekCacheForExpiredTokens();
+
+      ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>> 
KEKCacheEntry = KEKMapPerToken.get(accessToken);
+      if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+        synchronized (KEKMapPerToken) {
+          KEKCacheEntry = KEKMapPerToken.get(accessToken);
+          if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+            KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, KeyEncryptionKey>(), cacheEntryLifetime);
+            KEKMapPerToken.put(accessToken, KEKCacheEntry);
+          }
+        }
+      }
+      KEKPerMasterKeyID = KEKCacheEntry.getCachedItem();

Review comment:
       I am not sure it would work as expected but might worth give a try to 
avoid synchronization if you already use a `ConcurrentMap`:
   ```java
   kekPerMasterKeyID = kekMapPerToken.compute(accessToken, (token, 
kekCacheEntry) -> {
     if ((null == kekCacheEntry) || kekCacheEntry.isExpired()) {
       return new ExpiringCacheEntry<>(new ConcurrentHashMap<String, 
KeyEncryptionKey>(), cacheEntryLifetime);
     else {
       return kekCacheEntry;
   })).getCachedItem();
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class FileKeyWrapper {
+
+  public static final int KEK_LENGTH = 16;
+  public static final int KEK_ID_LENGTH = 16;
+
+  // For every token: a map of MEK_ID to (KEK ID and KEK)
+  private static final ConcurrentMap<String, 
ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>>> KEKMapPerToken =
+      new ConcurrentHashMap<>(KeyToolkit.INITIAL_PER_TOKEN_CACHE_SIZE);
+  private static volatile long lastKekCacheCleanupTimestamp = 
System.currentTimeMillis() + 60l * 1000; // grace period of 1 minute;
+
+  //A map of MEK_ID to (KEK ID and KEK) - for the current token
+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private final long cacheEntryLifetime;
+
+  private final KmsClient kmsClient;
+  private final String kmsInstanceID;
+  private final String kmsInstanceURL;
+  private final FileKeyMaterialStore keyMaterialStore;
+  private final Configuration hadoopConfiguration;
+  private final SecureRandom random;
+  private final boolean doubleWrapping;
+
+  private short keyCounter;
+  private String accessToken;
+
+  public FileKeyWrapper(Configuration configuration, FileKeyMaterialStore 
keyMaterialStore) {
+    this.hadoopConfiguration = configuration;
+
+    cacheEntryLifetime = 1000l * 
hadoopConfiguration.getLong(KeyToolkit.TOKEN_LIFETIME_PROPERTY_NAME, 
+        KeyToolkit.DEFAULT_CACHE_ENTRY_LIFETIME); 
+
+    kmsInstanceID = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 
+        KmsClient.DEFAULT_KMS_INSTANCE_ID);
+
+    doubleWrapping =  
hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, true);
+    accessToken = 
hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 
KmsClient.DEFAULT_ACCESS_TOKEN);
+
+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, configuration, 
accessToken, cacheEntryLifetime);
+
+    kmsInstanceURL = 
hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 
+        RemoteKmsClient.DEFAULT_KMS_INSTANCE_URL);
+
+    this.keyMaterialStore = keyMaterialStore;
+
+    random = new SecureRandom();
+    keyCounter = 0;
+
+    // Check caches upon each file writing (clean once in cacheEntryLifetime)
+    KeyToolkit.checkKmsCacheForExpiredTokens(cacheEntryLifetime);
+    if (doubleWrapping) {
+      checkKekCacheForExpiredTokens();
+
+      ExpiringCacheEntry<ConcurrentMap<String, KeyEncryptionKey>> 
KEKCacheEntry = KEKMapPerToken.get(accessToken);
+      if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+        synchronized (KEKMapPerToken) {
+          KEKCacheEntry = KEKMapPerToken.get(accessToken);
+          if ((null == KEKCacheEntry) || KEKCacheEntry.isExpired()) {
+            KEKCacheEntry = new ExpiringCacheEntry<>(new 
ConcurrentHashMap<String, KeyEncryptionKey>(), cacheEntryLifetime);
+            KEKMapPerToken.put(accessToken, KEKCacheEntry);
+          }
+        }
+      }
+      KEKPerMasterKeyID = KEKCacheEntry.getCachedItem();
+    } else {
+      KEKPerMasterKeyID = null;
+    }
+  }
+
+  public byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, 
boolean isFooterKey) {
+    return getEncryptionKeyMetadata(dataKey, masterKeyID, isFooterKey, null);
+  }
+
+  static void removeCacheEntriesForToken(String accessToken) {
+    synchronized(KEKMapPerToken) {
+      KEKMapPerToken.remove(accessToken);
+    }
+  }
+
+  static void removeCacheEntriesForAllTokens() {
+    synchronized (KEKMapPerToken) {

Review comment:
       Do we need synchronization here?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";
+
+  public static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
+  public static final String KEY_MATERIAL_TYPE = "PKMT1";
+  public static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = 
"internalStorage";
+  public static final String KEY_REFERENCE_FIELD = "keyReference";
+  public static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
+
+  public static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
+  public static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
+
+  public static final String MASTER_KEY_ID_FIELD = "masterKeyID";
+  public static final String WRAPPED_DEK_FIELD = "wrappedDEK";
+  public static final String KEK_ID_FIELD = "keyEncryptionKeyID";
+  public static final String WRAPPED_KEK_FIELD = "wrappedKEK";
+
+  public static final String FOOTER_KEY_ID_IN_FILE = "kf";
+  public static final String KEY_ID_IN_FILE_PREFIX = "k";

Review comment:
       Don't we want use more descriptive ids?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;

Review comment:
       nit:
   ```suggestion
   package org.apache.parquet.crypto.keytools;
   ```

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+public class KeyToolkit {
+
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = 
"encryption.kms.client.class";
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = 
"encryption.kms.instance.id";
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = 
"encryption.double.wrapping";
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = 
"encryption.key.access.token";
+  public static final String TOKEN_LIFETIME_PROPERTY_NAME = 
"encryption.key.access.token.lifetime";
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = 
"encryption.kms.instance.url";
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = 
"encryption.wrap.locally";
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = 
"encryption.key.material.internal.storage";
+
+  public static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
+  public static final String KEY_MATERIAL_TYPE = "PKMT1";
+  public static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = 
"internalStorage";
+  public static final String KEY_REFERENCE_FIELD = "keyReference";
+  public static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
+
+  public static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
+  public static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
+
+  public static final String MASTER_KEY_ID_FIELD = "masterKeyID";
+  public static final String WRAPPED_DEK_FIELD = "wrappedDEK";
+  public static final String KEK_ID_FIELD = "keyEncryptionKeyID";
+  public static final String WRAPPED_KEK_FIELD = "wrappedKEK";
+
+  public static final String FOOTER_KEY_ID_IN_FILE = "kf";
+  public static final String KEY_ID_IN_FILE_PREFIX = "k";
+
+  public static final long DEFAULT_CACHE_ENTRY_LIFETIME = 10 * 60; // 10 
minutes
+  public static final int INITIAL_PER_TOKEN_CACHE_SIZE = 5;

Review comment:
       I am not sure if need all of these constants to be public.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {
+  
+  public final static String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";
+  public static final String TEMP_FILE_PREFIX = "_TMP";
+  public final static String KEY_MATERIAL_FILE_SUFFFIX = ".json";
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private FileSystem hadoopFileSystem;
+  private Map<String, String> keyMaterialMap;
+  private Path keyMaterialFile;
+  
+  HadoopFSKeyMaterialStore(FileSystem hadoopFileSystem) {
+    this.hadoopFileSystem = hadoopFileSystem;
+  }
+
+  @Override
+  public void initialize(Path parquetFilePath, Configuration hadoopConfig, 
boolean tempStore) {
+    String fullPrefix = (tempStore? TEMP_FILE_PREFIX : "");
+    fullPrefix += KEY_MATERIAL_FILE_PREFIX;
+    keyMaterialFile = new Path(parquetFilePath.getParent(),
+      fullPrefix + parquetFilePath.getName() + KEY_MATERIAL_FILE_SUFFFIX);
+  }
+
+  @Override
+  public void addKeyMaterial(String keyIDInFile, String keyMaterial) throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      keyMaterialMap = new HashMap<>();
+    }
+    keyMaterialMap.put(keyIDInFile, keyMaterial);
+  }
+
+  @Override
+  public String getKeyMaterial(String keyIDInFile)  throws 
ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      loadKeyMaterialMap();
+    }
+    return keyMaterialMap.get(keyIDInFile);
+  }
+  
+  private void loadKeyMaterialMap() {
+    try (FSDataInputStream keyMaterialStream = 
hadoopFileSystem.open(keyMaterialFile)) {
+      JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);
+      keyMaterialMap = objectMapper.readValue(keyMaterialJson,
+        new TypeReference<Map<String, String>>() { });
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to get key material from 
" + keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public void saveMaterial() throws ParquetCryptoRuntimeException {
+    // TODO needed? Path qualifiedPath = 
parquetFilePath.makeQualified(hadoopFileSystem);
+    try (FSDataOutputStream keyMaterialStream = 
hadoopFileSystem.create(keyMaterialFile)) {
+      objectMapper.writeValue(keyMaterialStream, keyMaterialMap);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to save key material in 
" + keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public Set<String> getKeyIDSet() throws ParquetCryptoRuntimeException {
+    if (null == keyMaterialMap) {
+      loadKeyMaterialMap();
+    }
+
+    return keyMaterialMap.keySet();
+  }
+
+  @Override
+  public void removeMaterial() throws ParquetCryptoRuntimeException {
+    try {
+      hadoopFileSystem.delete(keyMaterialFile, false);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to delete file " + 
keyMaterialFile, e);
+    }
+  }
+
+  @Override
+  public void moveMaterialTo(FileKeyMaterialStore keyMaterialStore) throws 
ParquetCryptoRuntimeException {
+    HadoopFSKeyMaterialStore targetStore = (HadoopFSKeyMaterialStore) 
keyMaterialStore;

Review comment:
       At least we shall describe that it only supports this implementation of 
`FileKeyMaterialStore` objects. It would be nice in addition to throw a 
specific exception instead of a `ClassCastException` (e.g. 
`IllegalArgumentException` with the proper messaging).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Encryption key management tools 
> --------------------------------
>
>                 Key: PARQUET-1373
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1373
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>            Reporter: Gidon Gershinsky
>            Assignee: Gidon Gershinsky
>            Priority: Major
>
> Parquet Modular Encryption 
> ([PARQUET-1178|https://issues.apache.org/jira/browse/PARQUET-1178]) provides 
> an API that accepts keys, arbitrary key metadata and key retrieval callbacks 
> - which allows to implement basically any key management policy on top of it. 
> This Jira will add tools that implement a set of best practice elements for 
> key management. This is not an end-to-end key management, but rather a set of 
> components that might simplify design and development of an end-to-end 
> solution.
> This tool set is one of many possible. There is no goal to create a single or 
> “standard” toolkit for Parquet encryption keys. Parquet has a Crypto Factory 
> interface [(PARQUET-1817|https://issues.apache.org/jira/browse/PARQUET-1817]) 
> that allows to plug in different implementations of encryption key management.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to