flyrain commented on a change in pull request #3053:
URL: https://github.com/apache/iceberg/pull/3053#discussion_r709723506
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
##########
@@ -694,9 +757,17 @@ public static ReadBuilder read(InputFile file) {
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;
private NameMapping nameMapping = null;
+ private FileDecryptionProperties fileDecryptionProperties = null;
private ReadBuilder(InputFile file) {
this.file = file;
+ if (file instanceof EnvelopeEncryptedInputFile) {
+ EnvelopeEncryptedInputFile envelopeEncryptedInputFile =
(EnvelopeEncryptedInputFile) file;
+ if (envelopeEncryptedInputFile.useNativeDecryption()) {
+ fileDecryptionProperties =
+
createDecryptionProperties(envelopeEncryptedInputFile.nativeDecryptionParameters());
Review comment:
Nit: indentation is off.
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
Review comment:
Can we extract most of logic here into a method of a utility class?
These logic could be reused by other operations.
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
+ // TODO run by single thread? or synchronize?
+ if (null != encryptionManager) {
+ return encryptionManager;
+ }
+
+ TableMetadata tableMetadata = current();
+ Map<String, String> tableProperties = tableMetadata.properties();
+
+ String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_MANAGER_TYPE,
TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT);
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
+ encryptionManager = new PlaintextEncryptionManager();
+ return encryptionManager;
+ }
+
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)
||
+
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE))
{
+ boolean doubleWrap =
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE);
+ if (doubleWrap) {
+ throw new RuntimeException("Double envelope encryption is not
supported yet");
+ }
+
+ Schema tableSchema = tableMetadata.schema();
+
+ String tableKeyId = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_TABLE_KEY, null);
+ if (null == tableKeyId) {
+ throw new RuntimeException("Table encryption key is not specified");
+ }
Review comment:
We usually do like this `Preconditions.checkArgument(tableKeyId != null,
"Table encryption key is not specified")`
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
+ // TODO run by single thread? or synchronize?
+ if (null != encryptionManager) {
+ return encryptionManager;
+ }
+
+ TableMetadata tableMetadata = current();
+ Map<String, String> tableProperties = tableMetadata.properties();
+
+ String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_MANAGER_TYPE,
TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT);
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
+ encryptionManager = new PlaintextEncryptionManager();
+ return encryptionManager;
+ }
+
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)
||
+
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE))
{
+ boolean doubleWrap =
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE);
+ if (doubleWrap) {
+ throw new RuntimeException("Double envelope encryption is not
supported yet");
+ }
Review comment:
We may just remove these logic since double envelope isn't supported any
way, and at the end of the method we throw exception for any unsupported
encryption type.
##########
File path:
core/src/main/java/org/apache/iceberg/encryption/NativeEncryptedOutputFile.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import org.apache.iceberg.io.OutputFile;
+
+class NativeEncryptedOutputFile implements EncryptedOutputFile {
+
+ private final OutputFile rawOutput;
+ private final NativeFileEncryptParameters nativeEncryptionParameteres;
Review comment:
typo: -> "nativeEncryptionParameters"
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -228,4 +229,35 @@ private TableProperties() {
public static final String MERGE_CARDINALITY_CHECK_ENABLED =
"write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+ /**
+ * Encryption manager type
+ */
+ public static final String ENCRYPTION_MANAGER_TYPE =
"encryption.manager.type";
+ public static final String ENCRYPTION_MANAGER_TYPE_PLAINTEXT = "plaintext";
+ public static final String ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE =
"envelope";
+ public static final String ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE =
"double.envelope";
+
+ public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key";
+ // HIVE-21848 format
Review comment:
It'd be nice to give format examples.
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
##########
@@ -96,14 +109,24 @@
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;
this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ this.rowGroupOrdinal = 0;
try {
this.writer = new ParquetFileWriter(ParquetIO.file(output, conf),
parquetSchema,
- writeMode, rowGroupSize, 0);
+ writeMode, rowGroupSize, 0, columnIndexTruncateLength,
+ // TODO this value is used in current Iceberg. Get from conf instead?
+ ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+ // TODO this value is used in current Iceberg. Get from conf instead?
Review comment:
Do we allow user to set these properties? If yes, we should get them
from conf, if no, the current logic should be fine.
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
+ // TODO run by single thread? or synchronize?
+ if (null != encryptionManager) {
+ return encryptionManager;
+ }
+
+ TableMetadata tableMetadata = current();
+ Map<String, String> tableProperties = tableMetadata.properties();
+
+ String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_MANAGER_TYPE,
TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT);
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
+ encryptionManager = new PlaintextEncryptionManager();
+ return encryptionManager;
+ }
+
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)
||
+
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE))
{
+ boolean doubleWrap =
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE);
+ if (doubleWrap) {
+ throw new RuntimeException("Double envelope encryption is not
supported yet");
+ }
+
+ Schema tableSchema = tableMetadata.schema();
+
+ String tableKeyId = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_TABLE_KEY, null);
+ if (null == tableKeyId) {
+ throw new RuntimeException("Table encryption key is not specified");
+ }
+
+ boolean pushdown = PropertyUtil.propertyAsBoolean(tableProperties,
+ TableProperties.ENCRYPTION_PUSHDOWN_ENABLED,
TableProperties.ENCRYPTION_PUSHDOWN_ENABLED_DEFAULT);
+ // TODO since TableProperties.DEFAULT_FILE_FORMAT are overwritten eg in
Spark,
+ // TODO check for pushdown in each data format and throw unsupported
exception in Avro
+
+ String dataEncryptionAlgorithm =
PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_DATA_ALGORITHM,
TableProperties.ENCRYPTION_DATA_ALGORITHM_DEFAULT);
+
+ EnvelopeConfig.Builder dataFileConfBuilder =
EnvelopeConfig.builderFor(tableSchema)
+ .singleWrap(tableKeyId)
+
.useAlgorithm(EncryptionAlgorithm.valueOf(dataEncryptionAlgorithm));
+
+ String columnKeysProperty =
PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_COLUMN_KEYS, null);
+
+ if (null != columnKeysProperty) {
+ if (!pushdown) {
+ throw new RuntimeException("Column-specific master keys are
supported only in pushdown mode");
+ }
+
+ // TODO
+ throw new RuntimeException("Column-specific master keys are not
supported yet");
Review comment:
Use `Preconditions.checkArgument()`?
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
+ // TODO run by single thread? or synchronize?
+ if (null != encryptionManager) {
+ return encryptionManager;
+ }
+
+ TableMetadata tableMetadata = current();
+ Map<String, String> tableProperties = tableMetadata.properties();
+
+ String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_MANAGER_TYPE,
TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT);
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
+ encryptionManager = new PlaintextEncryptionManager();
+ return encryptionManager;
+ }
+
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)
||
+
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE))
{
+ boolean doubleWrap =
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE);
+ if (doubleWrap) {
+ throw new RuntimeException("Double envelope encryption is not
supported yet");
+ }
+
+ Schema tableSchema = tableMetadata.schema();
+
+ String tableKeyId = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_TABLE_KEY, null);
+ if (null == tableKeyId) {
+ throw new RuntimeException("Table encryption key is not specified");
+ }
+
+ boolean pushdown = PropertyUtil.propertyAsBoolean(tableProperties,
+ TableProperties.ENCRYPTION_PUSHDOWN_ENABLED,
TableProperties.ENCRYPTION_PUSHDOWN_ENABLED_DEFAULT);
+ // TODO since TableProperties.DEFAULT_FILE_FORMAT are overwritten eg in
Spark,
+ // TODO check for pushdown in each data format and throw unsupported
exception in Avro
+
+ String dataEncryptionAlgorithm =
PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_DATA_ALGORITHM,
TableProperties.ENCRYPTION_DATA_ALGORITHM_DEFAULT);
+
+ EnvelopeConfig.Builder dataFileConfBuilder =
EnvelopeConfig.builderFor(tableSchema)
+ .singleWrap(tableKeyId)
+
.useAlgorithm(EncryptionAlgorithm.valueOf(dataEncryptionAlgorithm));
+
+ String columnKeysProperty =
PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_COLUMN_KEYS, null);
+
+ if (null != columnKeysProperty) {
+ if (!pushdown) {
+ throw new RuntimeException("Column-specific master keys are
supported only in pushdown mode");
+ }
+
+ // TODO
+ throw new RuntimeException("Column-specific master keys are not
supported yet");
+ }
+
+ EnvelopeConfig dataFileConfig = dataFileConfBuilder.build();
+
+ String kmsClientImpl = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, null);
+
+ // Pass custom kms configuration from table and Hadoop properties
+ Map<String, String> kmsProperties = new HashMap<>();
+ for (Map.Entry<String, String> property : tableProperties.entrySet()) {
+ if (property.getKey().contains("kms.client")) { // TODO
+ kmsProperties.put(property.getKey(), property.getValue());
+ }
+ }
+
+ for (Map.Entry<String, String> property : conf) {
+ if (property.getKey().contains("kms.client")) { // TODO
+ kmsProperties.put(property.getKey(), property.getValue());
+ }
+ }
Review comment:
Is it cleaner if we put these logic into
`TableEnvelopeKeyManager.loadKmsClient` by passing the table properties and
conf?
##########
File path:
core/src/main/java/org/apache/iceberg/encryption/TableEnvelopeKeyManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.encryption;
+
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class TableEnvelopeKeyManager implements EnvelopeKeyManager {
+ private final KmsClient kmsClient;
+ private final String tableKekID;
+ private final boolean doubleWrap;
+
+ private final boolean uniformEncryption;
+ private Set<String> columnNamesUniformEncryption;
+ private final int dataKeyLength;
+
+ private transient volatile SecureRandom workerRNG = null;
+
+ public TableEnvelopeKeyManager(KmsClient kmsClient, EnvelopeConfig
dataEnvelopeConfig, boolean pushdown,
+ Schema schema, int dataKeyLength) {
+ this.kmsClient = kmsClient;
+ this.dataKeyLength = dataKeyLength;
+
+ if (dataEnvelopeConfig.isDoubleEnvelope()) {
+ throw new IllegalArgumentException("Double envelope encryption is not
supported yet");
+ } else { // single wrapping
+ doubleWrap = false;
+ tableKekID = dataEnvelopeConfig.kekId();
+ Preconditions.checkArgument(tableKekID != null,
+ "Table key encryption key ID is not configured");
Review comment:
Nit: the new line isn't needed.
##########
File path:
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
##########
@@ -82,6 +95,104 @@ public TableMetadata current() {
return currentMetadata;
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public EncryptionManager encryption() {
+ // TODO run by single thread? or synchronize?
+ if (null != encryptionManager) {
+ return encryptionManager;
+ }
+
+ TableMetadata tableMetadata = current();
+ Map<String, String> tableProperties = tableMetadata.properties();
+
+ String keyManagerType = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_MANAGER_TYPE,
TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT);
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) {
+ encryptionManager = new PlaintextEncryptionManager();
+ return encryptionManager;
+ }
+
+ if
(keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)
||
+
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE))
{
+ boolean doubleWrap =
keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE);
+ if (doubleWrap) {
+ throw new RuntimeException("Double envelope encryption is not
supported yet");
+ }
+
+ Schema tableSchema = tableMetadata.schema();
+
+ String tableKeyId = PropertyUtil.propertyAsString(tableProperties,
+ TableProperties.ENCRYPTION_TABLE_KEY, null);
Review comment:
Nit: no need a new line.
##########
File path:
api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java
##########
@@ -31,6 +31,14 @@
*/
public interface EncryptedOutputFile {
+ /**
+ * Use flat filestream encryption (default) or pushdown to native format
encryption
+ */
+ default boolean useNativeEncryption() {
Review comment:
rename it to "nativeEncryption"?
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -228,4 +229,35 @@ private TableProperties() {
public static final String MERGE_CARDINALITY_CHECK_ENABLED =
"write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+ /**
+ * Encryption manager type
+ */
+ public static final String ENCRYPTION_MANAGER_TYPE =
"encryption.manager.type";
+ public static final String ENCRYPTION_MANAGER_TYPE_PLAINTEXT = "plaintext";
+ public static final String ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE =
"envelope";
+ public static final String ENCRYPTION_MANAGER_TYPE_DOUBLE_ENVELOPE =
"double.envelope";
+
+ public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key";
+ // HIVE-21848 format
+ public static final String ENCRYPTION_COLUMN_KEYS = "encryption.column.keys";
+
+ public static final String ENCRYPTION_DEK_LENGTH =
"encryption.data.key.length";
+ public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16;
+
+ public static final String ENCRYPTION_DATA_ALGORITHM =
"encryption.data.algorithm";
+ public static final String ENCRYPTION_DATA_ALGORITHM_DEFAULT =
EncryptionAlgorithm.AES_GCM.toString();
+
+ /**
+ * Allow file format native encryption instead of
+ * encrypting the entire file through Iceberg encryption stream.
Review comment:
Nit: no need to add a new line. The comment can be in one line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]