This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.16.0.1-acs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73bc064dbbf1dbaaf6bc5204bf5ef6b34fbc143f
Author: Marton Balassi <mar...@apple.com>
AuthorDate: Wed Jan 5 21:45:24 2022 +0100

    [apple][internal] Add basic parquet encryption support + test
---
 flink-formats/flink-parquet/pom.xml                |  6 ++
 .../parquet/ParquetStreamingEncryptionITCase.java  | 77 ++++++++++++++++++++++
 flink-formats/flink-sql-parquet/pom.xml            |  1 +
 flink-formats/pom.xml                              |  3 +-
 4 files changed, 86 insertions(+), 1 deletion(-)

diff --git a/flink-formats/flink-parquet/pom.xml 
b/flink-formats/flink-parquet/pom.xml
index 61f13ce2ca1..2048bddcc41 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -87,6 +87,12 @@ under the License.
                        </exclusions>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.parquet</groupId>
+                       <artifactId>parquet-kms-apple</artifactId>
+                       <version>${flink.format.parquet.kms.version}</version>
+               </dependency>
+
                <!-- Hadoop is needed by Parquet -->
                <dependency>
                        <groupId>org.apache.hadoop</groupId>
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java
new file mode 100644
index 00000000000..105787449f6
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Checkpoint ITCase for {@link ParquetFileFormatFactory}. */
+public class ParquetStreamingEncryptionITCase extends 
FsStreamingSinkITCaseBase {
+
+    private static final String KEY_CONTENT = 
":v0:XiK8QX/OJme4yjH8H5tinvtA2N7OeMeSRvBXaoDfinE=,\n";
+
+    private static Path tmpDir;
+    private static File keyFile;
+
+    @BeforeClass
+    public static void generateKey() throws IOException {
+        tmpDir = Files.createTempDirectory("encryption").toAbsolutePath();
+        keyFile = new File(tmpDir.toFile(), "test.key");
+        keyFile.createNewFile();
+        FileUtils.writeFileUtf8(keyFile, keyFile.getAbsolutePath() + 
KEY_CONTENT);
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+    }
+
+    @Override
+    public String[] additionalProperties() {
+        String keyPath = keyFile.getAbsolutePath();
+        List<String> ret = new ArrayList<>();
+        ret.add("'format'='parquet'");
+        ret.add("'parquet.compression'='gzip'");
+
+        // Enable encryption
+        ret.add(
+                
"'parquet.crypto.factory.class'='com.apple.parquet.crypto.keytools.AppleCryptoFactory'");
+        ret.add(
+                
"'parquet.encryption.kms.client.class'='com.apple.parquet.crypto.keytools.CustomerKmsBridge'");
+        ret.add(String.format("'parquet.encryption.key.file'='%s'", keyPath));
+        // 
ret.add("'parquet.encryption.key.material.store.internally'='false'");
+        ret.add("'parquet.encryption.double.wrapping'='false'");
+
+        // Configure uniform encryption
+        ret.add("'parquet.uniform.encryption'='true'");
+        ret.add(String.format("'parquet.encryption.footer.key'='%s'", 
keyPath));
+
+        return ret.toArray(new String[0]);
+    }
+}
diff --git a/flink-formats/flink-sql-parquet/pom.xml 
b/flink-formats/flink-sql-parquet/pom.xml
index 4100b803217..c402030d3e5 100644
--- a/flink-formats/flink-sql-parquet/pom.xml
+++ b/flink-formats/flink-sql-parquet/pom.xml
@@ -85,6 +85,7 @@ under the License.
                                                                        
<include>org.apache.parquet:parquet-encoding</include>
                                                                        
<include>org.apache.parquet:parquet-format-structures</include>
                                                                        
<include>org.apache.parquet:parquet-jackson</include>
+                                                                       
<include>org.apache.parquet:parquet-kms-apple</include>
                                                                        
<include>org.codehaus.jackson:jackson-core-asl</include>
                                                                        
<include>org.codehaus.jackson:jackson-mapper-asl</include>
                                                                        
<include>commons-pool:commons-pool</include>
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 6c303d14b59..7b1acb7b7af 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -30,8 +30,9 @@ under the License.
        </parent>
 
        <properties>
-               
<flink.format.parquet.version>1.12.2</flink.format.parquet.version>
                <guava.version>30.0-jre</guava.version>
+               
<flink.format.parquet.version>1.12.0.15-apple</flink.format.parquet.version>
+               
<flink.format.parquet.kms.version>1.1.3-apple</flink.format.parquet.kms.version>
        </properties>
 
        <artifactId>flink-formats</artifactId>

Reply via email to