This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch release-1.16.0.1-acs in repository https://gitbox.apache.org/repos/asf/flink.git
commit 73bc064dbbf1dbaaf6bc5204bf5ef6b34fbc143f Author: Marton Balassi <mar...@apple.com> AuthorDate: Wed Jan 5 21:45:24 2022 +0100 [apple][internal] Add basic parquet encryption support + test --- flink-formats/flink-parquet/pom.xml | 6 ++ .../parquet/ParquetStreamingEncryptionITCase.java | 77 ++++++++++++++++++++++ flink-formats/flink-sql-parquet/pom.xml | 1 + flink-formats/pom.xml | 3 +- 4 files changed, 86 insertions(+), 1 deletion(-) diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 61f13ce2ca1..2048bddcc41 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -87,6 +87,12 @@ under the License. </exclusions> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-kms-apple</artifactId> + <version>${flink.format.parquet.kms.version}</version> + </dependency> + <!-- Hadoop is needed by Parquet --> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java new file mode 100644 index 00000000000..105787449f6 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetStreamingEncryptionITCase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.util.FileUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** Checkpoint ITCase for {@link ParquetFileFormatFactory}. */ +public class ParquetStreamingEncryptionITCase extends FsStreamingSinkITCaseBase { + + private static final String KEY_CONTENT = ":v0:XiK8QX/OJme4yjH8H5tinvtA2N7OeMeSRvBXaoDfinE=,\n"; + + private static Path tmpDir; + private static File keyFile; + + @BeforeClass + public static void generateKey() throws IOException { + tmpDir = Files.createTempDirectory("encryption").toAbsolutePath(); + keyFile = new File(tmpDir.toFile(), "test.key"); + keyFile.createNewFile(); + FileUtils.writeFileUtf8(keyFile, keyFile.getAbsolutePath() + KEY_CONTENT); + } + + @AfterClass + public static void cleanup() { + FileUtils.deleteDirectoryQuietly(tmpDir.toFile()); + } + + @Override + public String[] additionalProperties() { + String keyPath = keyFile.getAbsolutePath(); + List<String> ret = new ArrayList<>(); + ret.add("'format'='parquet'"); + ret.add("'parquet.compression'='gzip'"); + + // Enable encryption + ret.add( + "'parquet.crypto.factory.class'='com.apple.parquet.crypto.keytools.AppleCryptoFactory'"); + ret.add( + "'parquet.encryption.kms.client.class'='com.apple.parquet.crypto.keytools.CustomerKmsBridge'"); + ret.add(String.format("'parquet.encryption.key.file'='%s'", keyPath)); + // ret.add("'parquet.encryption.key.material.store.internally'='false'"); + ret.add("'parquet.encryption.double.wrapping'='false'"); + + // Configure uniform encryption + ret.add("'parquet.uniform.encryption'='true'"); + ret.add(String.format("'parquet.encryption.footer.key'='%s'", keyPath)); + + return ret.toArray(new String[0]); + } +} diff --git a/flink-formats/flink-sql-parquet/pom.xml b/flink-formats/flink-sql-parquet/pom.xml index 4100b803217..c402030d3e5 100644 --- a/flink-formats/flink-sql-parquet/pom.xml +++ b/flink-formats/flink-sql-parquet/pom.xml @@ -85,6 +85,7 @@ under the License. <include>org.apache.parquet:parquet-encoding</include> <include>org.apache.parquet:parquet-format-structures</include> <include>org.apache.parquet:parquet-jackson</include> + <include>org.apache.parquet:parquet-kms-apple</include> <include>org.codehaus.jackson:jackson-core-asl</include> <include>org.codehaus.jackson:jackson-mapper-asl</include> <include>commons-pool:commons-pool</include> diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml index 6c303d14b59..7b1acb7b7af 100644 --- a/flink-formats/pom.xml +++ b/flink-formats/pom.xml @@ -30,8 +30,9 @@ under the License. </parent> <properties> - <flink.format.parquet.version>1.12.2</flink.format.parquet.version> <guava.version>30.0-jre</guava.version> + <flink.format.parquet.version>1.12.0.15-apple</flink.format.parquet.version> + <flink.format.parquet.kms.version>1.1.3-apple</flink.format.parquet.kms.version> </properties> <artifactId>flink-formats</artifactId>