This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d6c8358ff2 API, Core: Support manifest encryption (#8252)
d6c8358ff2 is described below
commit d6c8358ff26957c9234580addb03a0db1e441c4d
Author: ggershinsky <[email protected]>
AuthorDate: Wed Mar 13 19:48:09 2024 +0200
API, Core: Support manifest encryption (#8252)
---
.../apache/iceberg/io/PositionOutputStream.java | 11 +
.../org/apache/iceberg/BaseRewriteManifests.java | 6 +-
.../main/java/org/apache/iceberg/FastAppend.java | 8 +-
.../java/org/apache/iceberg/ManifestFiles.java | 45 +++-
.../java/org/apache/iceberg/ManifestWriter.java | 18 +-
.../apache/iceberg/MergingSnapshotProducer.java | 8 +-
.../java/org/apache/iceberg/SnapshotProducer.java | 20 +-
.../main/java/org/apache/iceberg/V1Metadata.java | 10 +-
.../main/java/org/apache/iceberg/V2Metadata.java | 3 +-
.../org/apache/iceberg/avro/AvroFileAppender.java | 2 +-
.../iceberg/encryption/AesGcmOutputStream.java | 5 +
.../apache/iceberg/encryption/EncryptedFiles.java | 4 +
.../org/apache/iceberg/TestManifestEncryption.java | 247 +++++++++++++++++++++
13 files changed, 358 insertions(+), 29 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
b/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
index b80549fbde..5092863809 100644
--- a/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
+++ b/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
@@ -29,4 +29,15 @@ public abstract class PositionOutputStream extends
OutputStream {
* @throws IOException If the underlying stream throws IOException
*/
public abstract long getPos() throws IOException;
+
+ /**
+ * Return the current stored length of the output. Can differ from the
current position for
+ * encrypting streams, and for other non-length-preserving streams.
+ *
+ * @return current stored length in bytes
+ * @throws IOException If the underlying stream throws IOException
+ */
+ public long storedLength() throws IOException {
+ return getPos();
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index c70dda2bd6..e8fbfef2ca 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -32,10 +32,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,8 +154,8 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
- InputFile toCopy = ops.io().newInputFile(manifest.path());
- OutputFile newFile = newManifestOutput();
+ InputFile toCopy = ops.io().newInputFile(manifest);
+ EncryptedOutputFile newFile = newManifestOutputFile();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
manifest.partitionSpecId(),
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index e0919d9c7b..14e776a92d 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -121,14 +121,14 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
- InputFile toCopy = ops.io().newInputFile(manifest.path());
- OutputFile newManifestPath = newManifestOutput();
+ InputFile toCopy = ops.io().newInputFile(manifest);
+ EncryptedOutputFile newManifestFile = newManifestOutputFile();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
manifest.partitionSpecId(),
toCopy,
current.specsById(),
- newManifestPath,
+ newManifestFile,
snapshotId(),
summaryBuilder);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index f7b1add6bd..9009f19ec9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ContentCache;
@@ -157,11 +159,29 @@ public class ManifestFiles {
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long
snapshotId) {
+ return write(
+ formatVersion, spec,
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
+ }
+
+ /**
+ * Create a new {@link ManifestWriter} for the given format version.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param encryptedOutputFile an {@link EncryptedOutputFile} where the
manifest will be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DataFile> write(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile encryptedOutputFile,
+ Long snapshotId) {
switch (formatVersion) {
case 1:
- return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
+ return new ManifestWriter.V1Writer(spec, encryptedOutputFile,
snapshotId);
case 2:
- return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
+ return new ManifestWriter.V2Writer(spec, encryptedOutputFile,
snapshotId);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
@@ -198,6 +218,21 @@ public class ManifestFiles {
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long
snapshotId) {
+ return writeDeleteManifest(
+ formatVersion, spec,
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
+ }
+
+ /**
+ * Create a new {@link ManifestWriter} for the given format version.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param outputFile an {@link EncryptedOutputFile} where the manifest will
be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DeleteFile> writeDeleteManifest(
+ int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile,
Long snapshotId) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1
table");
@@ -254,7 +289,7 @@ public class ManifestFiles {
int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
- OutputFile outputFile,
+ EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
// use metadata that will add the current snapshot's ID for the rewrite
@@ -278,7 +313,7 @@ public class ManifestFiles {
int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
- OutputFile outputFile,
+ EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
// for a rewritten manifest all snapshot ids should be set. use empty
metadata to throw an
@@ -302,7 +337,7 @@ public class ManifestFiles {
private static ManifestFile copyManifestInternal(
int formatVersion,
ManifestReader<DataFile> reader,
- OutputFile outputFile,
+ EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 4865ccfc3b..cea907ddac 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -19,7 +19,9 @@
package org.apache.iceberg;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
@@ -37,6 +39,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static final long UNASSIGNED_SEQ = -1L;
private final OutputFile file;
+ private final ByteBuffer keyMetadataBuffer;
private final int specId;
private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
@@ -52,13 +55,14 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
private long deletedRows = 0L;
private Long minDataSequenceNumber = null;
- private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId)
{
- this.file = file;
+ private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long
snapshotId) {
+ this.file = file.encryptingOutputFile();
this.specId = spec.specId();
- this.writer = newAppender(spec, file);
+ this.writer = newAppender(spec, this.file);
this.snapshotId = snapshotId;
this.reused = new GenericManifestEntry<>(spec.partitionType());
this.stats = new PartitionSummary(spec);
+ this.keyMetadataBuffer = (file.keyMetadata() == null) ? null :
file.keyMetadata().buffer();
}
protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
@@ -204,7 +208,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
deletedFiles,
deletedRows,
stats.summaries(),
- null);
+ keyMetadataBuffer);
}
@Override
@@ -216,7 +220,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;
- V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId,
spec.partitionType());
}
@@ -250,7 +254,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;
- V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long
snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId,
spec.partitionType());
}
@@ -289,7 +293,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V1Writer extends ManifestWriter<DataFile> {
private final V1Metadata.IndexedManifestEntry entryWrapper;
- V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new
V1Metadata.IndexedManifestEntry(spec.partitionType());
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 5d3ec6e35f..c1dc6b58b7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
@@ -38,7 +39,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -275,14 +275,14 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
- InputFile toCopy = ops.io().newInputFile(manifest.path());
- OutputFile newManifestPath = newManifestOutput();
+ InputFile toCopy = ops.io().newInputFile(manifest);
+ EncryptedOutputFile newManifestFile = newManifestOutputFile();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
manifest.partitionSpecId(),
toCopy,
current.specsById(),
- newManifestPath,
+ newManifestFile,
snapshotId(),
appendedManifestsSummary);
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 757d0b78bc..1c4491a684 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -43,6 +43,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CleanableFailure;
@@ -255,7 +257,6 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
.run(index -> manifestFiles[index] =
manifestsWithMetadata.get(manifests.get(index)));
writer.addAll(Arrays.asList(manifestFiles));
-
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}
@@ -499,6 +500,11 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
"snap-%d-%d-%s", snapshotId(),
attempt.incrementAndGet(), commitUUID))));
}
+ /**
+ * @deprecated will be removed in 1.7.0; Use {@link
SnapshotProducer#newManifestOutputFile}
+ * instead
+ */
+ @Deprecated
protected OutputFile newManifestOutput() {
return ops.io()
.newOutputFile(
@@ -506,14 +512,22 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
FileFormat.AVRO.addExtension(commitUUID + "-m" +
manifestCount.getAndIncrement())));
}
+ protected EncryptedOutputFile newManifestOutputFile() {
+ String manifestFileLocation =
+ ops.metadataFileLocation(
+ FileFormat.AVRO.addExtension(commitUUID + "-m" +
manifestCount.getAndIncrement()));
+ return EncryptingFileIO.combine(ops.io(), ops.encryption())
+ .newEncryptingOutputFile(manifestFileLocation);
+ }
+
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
return ManifestFiles.write(
- ops.current().formatVersion(), spec, newManifestOutput(),
snapshotId());
+ ops.current().formatVersion(), spec, newManifestOutputFile(),
snapshotId());
}
protected ManifestWriter<DeleteFile> newDeleteManifestWriter(PartitionSpec
spec) {
return ManifestFiles.writeDeleteManifest(
- ops.current().formatVersion(), spec, newManifestOutput(),
snapshotId());
+ ops.current().formatVersion(), spec, newManifestOutputFile(),
snapshotId());
}
protected RollingManifestWriter<DataFile>
newRollingManifestWriter(PartitionSpec spec) {
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java
b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index fddcee0374..30b04cd731 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -43,7 +43,8 @@ class V1Metadata {
ManifestFile.PARTITION_SUMMARIES,
ManifestFile.ADDED_ROWS_COUNT,
ManifestFile.EXISTING_ROWS_COUNT,
- ManifestFile.DELETED_ROWS_COUNT);
+ ManifestFile.DELETED_ROWS_COUNT,
+ ManifestFile.KEY_METADATA);
/**
* A wrapper class to write any ManifestFile implementation to Avro using
the v1 schema.
@@ -97,6 +98,8 @@ class V1Metadata {
return existingRowsCount();
case 10:
return deletedRowsCount();
+ case 11:
+ return keyMetadata();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " +
pos);
}
@@ -187,6 +190,11 @@ class V1Metadata {
return wrapped.partitions();
}
+ @Override
+ public ByteBuffer keyMetadata() {
+ return wrapped.keyMetadata();
+ }
+
@Override
public ManifestFile copy() {
return wrapped.copy();
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index ba6bb4a319..8f3b71d399 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -47,7 +47,8 @@ class V2Metadata {
ManifestFile.ADDED_ROWS_COUNT.asRequired(),
ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
ManifestFile.DELETED_ROWS_COUNT.asRequired(),
- ManifestFile.PARTITION_SUMMARIES);
+ ManifestFile.PARTITION_SUMMARIES,
+ ManifestFile.KEY_METADATA);
/**
* A wrapper class to write any ManifestFile implementation to Avro using
the v2 write schema.
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
index 47e3c2c284..c69a8626f6 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
@@ -80,7 +80,7 @@ class AvroFileAppender<D> implements FileAppender<D> {
public long length() {
if (stream != null) {
try {
- return stream.getPos();
+ return stream.storedLength();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get stream length");
}
diff --git
a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
index b4f723cca3..da437b7540 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
@@ -129,6 +129,11 @@ public class AesGcmOutputStream extends
PositionOutputStream {
targetStream.close();
}
+ @Override
+ public long storedLength() throws IOException {
+ return targetStream.storedLength();
+ }
+
private void writeHeader() throws IOException {
targetStream.write(HEADER_BYTES);
isHeaderWritten = true;
diff --git
a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
index c0fc41ca13..b0e2e74d93 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
@@ -57,5 +57,9 @@ public class EncryptedFiles {
encryptedOutputFile,
BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
}
+ public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile
encryptingOutputFile) {
+ return new BaseEncryptedOutputFile(encryptingOutputFile,
EncryptionKeyMetadata.EMPTY);
+ }
+
private EncryptedFiles() {}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
new file mode 100644
index 0000000000..b64324ec57
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionTestHelpers;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestManifestEncryption {
+ private static final FileIO FILE_IO = new TestTables.LocalFileIO();
+
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "timestamp", Types.TimestampType.withZone()),
+ required(3, "category", Types.StringType.get()),
+ required(4, "data", Types.StringType.get()),
+ required(5, "double", Types.DoubleType.get()));
+
+ private static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA)
+ .identity("category")
+ .hour("timestamp")
+ .bucket("id", 16)
+ .build();
+
+ private static final long SNAPSHOT_ID = 987134631982734L;
+ private static final String PATH =
+
"s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro";
+ private static final FileFormat FORMAT = FileFormat.AVRO;
+ private static final PartitionData PARTITION =
+ DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3");
+ private static final Metrics METRICS =
+ new Metrics(
+ 1587L,
+ ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), //
sizes
+ ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), //
value counts
+ ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value
counts
+ ImmutableMap.of(5, 10L), // nan value counts
+ ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(),
1)), // lower bounds
+ ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(),
1))); // upper bounds
+ private static final List<Long> OFFSETS = ImmutableList.of(4L);
+ private static final Integer SORT_ORDER_ID = 2;
+
+ private static final ByteBuffer CONTENT_KEY_METADATA =
ByteBuffer.allocate(100);
+
+ private static final DataFile DATA_FILE =
+ new GenericDataFile(
+ 0,
+ PATH,
+ FORMAT,
+ PARTITION,
+ 150972L,
+ METRICS,
+ CONTENT_KEY_METADATA,
+ OFFSETS,
+ SORT_ORDER_ID);
+
+ private static final List<Integer> EQUALITY_IDS = ImmutableList.of(1);
+ private static final int[] EQUALITY_ID_ARR = new int[] {1};
+
+ private static final DeleteFile DELETE_FILE =
+ new GenericDeleteFile(
+ SPEC.specId(),
+ FileContent.EQUALITY_DELETES,
+ PATH,
+ FORMAT,
+ PARTITION,
+ 22905L,
+ METRICS,
+ EQUALITY_ID_ARR,
+ SORT_ORDER_ID,
+ null,
+ CONTENT_KEY_METADATA);
+
+ private static final EncryptionManager ENCRYPTION_MANAGER =
+ EncryptionTestHelpers.createEncryptionManager();
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testV1Write() throws IOException {
+ ManifestFile manifest = writeManifest(1);
+ checkEntry(
+ readManifest(manifest),
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.DATA);
+ }
+
+ @Test
+ public void testV2Write() throws IOException {
+ ManifestFile manifest = writeManifest(2);
+ checkEntry(
+ readManifest(manifest),
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.DATA);
+ }
+
+ @Test
+ public void testV2WriteDelete() throws IOException {
+ ManifestFile manifest = writeDeleteManifest(2);
+ checkEntry(
+ readDeleteManifest(manifest),
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.EQUALITY_DELETES);
+ }
+
+ void checkEntry(
+ ManifestEntry<?> entry,
+ Long expectedDataSequenceNumber,
+ Long expectedFileSequenceNumber,
+ FileContent content) {
+ Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status());
+ Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
+ Assert.assertEquals(
+ "Data sequence number", expectedDataSequenceNumber,
entry.dataSequenceNumber());
+ Assert.assertEquals(
+ "File sequence number", expectedFileSequenceNumber,
entry.fileSequenceNumber());
+ checkDataFile(entry.file(), content);
+ }
+
+ void checkDataFile(ContentFile<?> dataFile, FileContent content) {
+ // DataFile is the superclass of DeleteFile, so this method can check both
+ Assert.assertEquals("Content", content, dataFile.content());
+ Assert.assertEquals("Path", PATH, dataFile.path());
+ Assert.assertEquals("Format", FORMAT, dataFile.format());
+ Assert.assertEquals("Partition", PARTITION, dataFile.partition());
+ Assert.assertEquals("Record count", METRICS.recordCount(), (Long)
dataFile.recordCount());
+ Assert.assertEquals("Column sizes", METRICS.columnSizes(),
dataFile.columnSizes());
+ Assert.assertEquals("Value counts", METRICS.valueCounts(),
dataFile.valueCounts());
+ Assert.assertEquals("Null value counts", METRICS.nullValueCounts(),
dataFile.nullValueCounts());
+ Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(),
dataFile.nanValueCounts());
+ Assert.assertEquals("Lower bounds", METRICS.lowerBounds(),
dataFile.lowerBounds());
+ Assert.assertEquals("Upper bounds", METRICS.upperBounds(),
dataFile.upperBounds());
+ Assert.assertEquals("Sort order id", SORT_ORDER_ID,
dataFile.sortOrderId());
+ if (dataFile.content() == FileContent.EQUALITY_DELETES) {
+ Assert.assertEquals(EQUALITY_IDS, dataFile.equalityFieldIds());
+ } else {
+ Assert.assertNull(dataFile.equalityFieldIds());
+ }
+ }
+
+ private ManifestFile writeManifest(int formatVersion) throws IOException {
+ return writeManifest(DATA_FILE, formatVersion);
+ }
+
+ private ManifestFile writeManifest(DataFile file, int formatVersion) throws
IOException {
+ OutputFile manifestFile =
+
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+ EncryptedOutputFile encryptedManifest =
ENCRYPTION_MANAGER.encrypt(manifestFile);
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(formatVersion, SPEC, encryptedManifest,
SNAPSHOT_ID);
+ try {
+ writer.add(file);
+ } finally {
+ writer.close();
+ }
+ return writer.toManifestFile();
+ }
+
+ private ManifestEntry<DataFile> readManifest(ManifestFile manifest) throws
IOException {
+ // First try to read without decryption
+ Assertions.assertThatThrownBy(
+ () ->
+ ManifestFiles.read(
+ manifest,
+ EncryptingFileIO.combine(FILE_IO,
PlaintextEncryptionManager.instance()),
+ null))
+ .isInstanceOf(RuntimeIOException.class)
+ .hasMessageContaining("Failed to open file")
+ .hasCauseInstanceOf(InvalidAvroMagicException.class);
+
+ try (CloseableIterable<ManifestEntry<DataFile>> reader =
+ ManifestFiles.read(manifest, EncryptingFileIO.combine(FILE_IO,
ENCRYPTION_MANAGER), null)
+ .entries()) {
+ List<ManifestEntry<DataFile>> files = Lists.newArrayList(reader);
+ Assert.assertEquals("Should contain only one data file", 1,
files.size());
+ return files.get(0);
+ }
+ }
+
+ private ManifestFile writeDeleteManifest(int formatVersion) throws
IOException {
+ OutputFile manifestFile =
+
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+ EncryptedOutputFile encryptedManifest =
ENCRYPTION_MANAGER.encrypt(manifestFile);
+ ManifestWriter<DeleteFile> writer =
+ ManifestFiles.writeDeleteManifest(formatVersion, SPEC,
encryptedManifest, SNAPSHOT_ID);
+ try {
+ writer.add(DELETE_FILE);
+ } finally {
+ writer.close();
+ }
+ return writer.toManifestFile();
+ }
+
+ private ManifestEntry<DeleteFile> readDeleteManifest(ManifestFile manifest)
throws IOException {
+ try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+ ManifestFiles.readDeleteManifest(
+ manifest, EncryptingFileIO.combine(FILE_IO,
ENCRYPTION_MANAGER), null)
+ .entries()) {
+ List<ManifestEntry<DeleteFile>> entries = Lists.newArrayList(reader);
+ Assert.assertEquals("Should contain only one delete file", 1,
entries.size());
+ return entries.get(0);
+ }
+ }
+}