This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bb50ab97d3 Core: Support Avro file encryption with AES GCM streams
(#9436)
bb50ab97d3 is described below
commit bb50ab97d377476bfa3a743448e4afabe619e764
Author: ggershinsky <[email protected]>
AuthorDate: Tue Jan 16 19:55:06 2024 +0200
Core: Support Avro file encryption with AES GCM streams (#9436)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 9 -
.../iceberg/avro/TestEncryptedAvroFileSplit.java | 209 +++++++++++++++++++++
.../iceberg/encryption/EncryptionTestHelpers.java | 41 ++++
.../org/apache/iceberg/encryption/UnitestKMS.java | 38 ++++
4 files changed, 288 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 6aa3241c36..0eaa3f2d24 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -94,9 +94,6 @@ public class Avro {
}
public static WriteBuilder write(EncryptedOutputFile file) {
- Preconditions.checkState(
- file.keyMetadata() == null || file.keyMetadata() ==
EncryptionKeyMetadata.EMPTY,
- "Avro encryption is not supported");
return new WriteBuilder(file.encryptingOutputFile());
}
@@ -282,9 +279,6 @@ public class Avro {
}
public static DataWriteBuilder writeData(EncryptedOutputFile file) {
- Preconditions.checkState(
- file.keyMetadata() == null || file.keyMetadata() ==
EncryptionKeyMetadata.EMPTY,
- "Avro encryption is not supported");
return new DataWriteBuilder(file.encryptingOutputFile());
}
@@ -385,9 +379,6 @@ public class Avro {
}
public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
- Preconditions.checkState(
- file.keyMetadata() == null || file.keyMetadata() ==
EncryptionKeyMetadata.EMPTY,
- "Avro encryption is not supported");
return new DeleteWriteBuilder(file.encryptingOutputFile());
}
diff --git
a/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java
b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java
new file mode 100644
index 0000000000..9020a12302
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionTestHelpers;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestEncryptedAvroFileSplit {
+ private static final Schema SCHEMA =
+ new Schema(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.required(2, "data", Types.StringType.get()));
+
+ private static final EncryptionManager ENCRYPTION_MANAGER =
+ EncryptionTestHelpers.createEncryptionManager();
+
+ private static final int NUM_RECORDS = 100_000;
+
+ @TempDir Path temp;
+
+ public List<Record> expected = null;
+ public InputFile file = null;
+
+ @BeforeEach
+ public void writeDataFile() throws IOException {
+ this.expected = Lists.newArrayList();
+
+ OutputFile out = Files.localOutput(temp.toFile());
+
+ EncryptedOutputFile eOut = ENCRYPTION_MANAGER.encrypt(out);
+
+ try (FileAppender<Object> writer =
+ Avro.write(eOut)
+ .set(TableProperties.AVRO_COMPRESSION, "uncompressed")
+ .createWriterFunc(DataWriter::create)
+ .schema(SCHEMA)
+ .overwrite()
+ .build()) {
+
+ Record record = GenericRecord.create(SCHEMA);
+ for (long i = 0; i < NUM_RECORDS; i += 1) {
+ Record next = record.copy(ImmutableMap.of("id", i, "data",
UUID.randomUUID().toString()));
+ expected.add(next);
+ writer.add(next);
+ }
+ }
+
+ EncryptedInputFile encryptedIn =
+ EncryptedFiles.encryptedInput(out.toInputFile(), eOut.keyMetadata());
+
+ this.file = ENCRYPTION_MANAGER.decrypt(encryptedIn);
+ }
+
+ @Test
+ public void testSplitDataSkipping() throws IOException {
+ long end = file.getLength();
+ long splitLocation = end / 2;
+
+ List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
+ assertThat(firstHalf.size()).as("First split should not be
empty").isNotEqualTo(0);
+
+ List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end -
splitLocation - 1);
+ assertThat(secondHalf.size()).as("Second split should not be
empty").isNotEqualTo(0);
+
+ assertThat(firstHalf.size() + secondHalf.size())
+ .as("Total records should match expected")
+ .isEqualTo(expected.size());
+
+ for (int i = 0; i < firstHalf.size(); i += 1) {
+ assertThat(firstHalf.get(i)).isEqualTo(expected.get(i));
+ }
+
+ for (int i = 0; i < secondHalf.size(); i += 1) {
+ assertThat(secondHalf.get(i)).isEqualTo(expected.get(firstHalf.size() +
i));
+ }
+ }
+
+ @Test
+ public void testPosField() throws IOException {
+ Schema projection =
+ new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION,
SCHEMA.columns().get(1));
+
+ List<Record> records = readAvro(file, projection, 0, file.getLength());
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ assertThat(records.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+ .as("Field _pos should match")
+ .isEqualTo((long) i);
+
+ assertThat(records.get(i).getField("id"))
+ .as("Field id should match")
+ .isEqualTo(expected.get(i).getField("id"));
+
+ assertThat(records.get(i).getField("data"))
+ .as("Field data should match")
+ .isEqualTo(expected.get(i).getField("data"));
+ }
+ }
+
+ @Test
+ public void testPosFieldWithSplits() throws IOException {
+ Schema projection =
+ new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION,
SCHEMA.columns().get(1));
+
+ long end = file.getLength();
+ long splitLocation = end / 2;
+
+ List<Record> secondHalf =
+ readAvro(file, projection, splitLocation + 1, end - splitLocation - 1);
+ assertThat(secondHalf.size()).as("Second split should not be
empty").isNotEqualTo(0);
+
+ List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
+ assertThat(firstHalf.size()).as("First split should not be
empty").isNotEqualTo(0);
+
+ assertThat(firstHalf.size() + secondHalf.size())
+ .as("Total records should match expected")
+ .isEqualTo(expected.size());
+
+ for (int i = 0; i < firstHalf.size(); i += 1) {
+
assertThat(firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+ .as("Field _pos should match")
+ .isEqualTo((long) i);
+ assertThat(firstHalf.get(i).getField("id"))
+ .as("Field id should match")
+ .isEqualTo(expected.get(i).getField("id"));
+ assertThat(firstHalf.get(i).getField("data"))
+ .as("Field data should match")
+ .isEqualTo(expected.get(i).getField("data"));
+ }
+
+ for (int i = 0; i < secondHalf.size(); i += 1) {
+
assertThat(secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+ .as("Field _pos should match")
+ .isEqualTo((long) (firstHalf.size() + i));
+ assertThat(secondHalf.get(i).getField("id"))
+ .as("Field id should match")
+ .isEqualTo(expected.get(firstHalf.size() + i).getField("id"));
+ assertThat(secondHalf.get(i).getField("data"))
+ .as("Field data should match")
+ .isEqualTo(expected.get(firstHalf.size() + i).getField("data"));
+ }
+ }
+
+ @Test
+ public void testPosWithEOFSplit() throws IOException {
+ Schema projection =
+ new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION,
SCHEMA.columns().get(1));
+
+ long end = file.getLength();
+
+ List<Record> records = readAvro(file, projection, end - 10, 10);
+ assertThat(records.size()).as("Should not read any records").isEqualTo(0);
+ }
+
+ public List<Record> readAvro(InputFile in, Schema projection, long start,
long length)
+ throws IOException {
+ try (AvroIterable<Record> reader =
+ Avro.read(in)
+ .createReaderFunc(DataReader::create)
+ .split(start, length)
+ .project(projection)
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java
b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java
new file mode 100644
index 0000000000..aa49e1c40f
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.encryption;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class EncryptionTestHelpers {
+
+ private EncryptionTestHelpers() {}
+
+ public static EncryptionManager createEncryptionManager() {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+ catalogProperties.put(
+ CatalogProperties.ENCRYPTION_KMS_IMPL,
UnitestKMS.class.getCanonicalName());
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY,
UnitestKMS.MASTER_KEY_NAME1);
+ tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+
+ return EncryptionUtil.createEncryptionManager(
+ tableProperties, EncryptionUtil.createKmsClient(catalogProperties));
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java
b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java
new file mode 100644
index 0000000000..52a0e36c00
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.encryption;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class UnitestKMS extends MemoryMockKMS {
+ public static final String MASTER_KEY_NAME1 = "keyA";
+ public static final byte[] MASTER_KEY1 =
"0123456789012345".getBytes(StandardCharsets.UTF_8);
+ public static final String MASTER_KEY_NAME2 = "keyB";
+ public static final byte[] MASTER_KEY2 =
"1123456789012345".getBytes(StandardCharsets.UTF_8);
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ masterKeys =
+ ImmutableMap.of(
+ MASTER_KEY_NAME1, MASTER_KEY1,
+ MASTER_KEY_NAME2, MASTER_KEY2);
+ }
+}