This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d6c8358ff2 API, Core: Support manifest encryption (#8252)
d6c8358ff2 is described below

commit d6c8358ff26957c9234580addb03a0db1e441c4d
Author: ggershinsky <[email protected]>
AuthorDate: Wed Mar 13 19:48:09 2024 +0200

    API, Core: Support manifest encryption (#8252)
---
 .../apache/iceberg/io/PositionOutputStream.java    |  11 +
 .../org/apache/iceberg/BaseRewriteManifests.java   |   6 +-
 .../main/java/org/apache/iceberg/FastAppend.java   |   8 +-
 .../java/org/apache/iceberg/ManifestFiles.java     |  45 +++-
 .../java/org/apache/iceberg/ManifestWriter.java    |  18 +-
 .../apache/iceberg/MergingSnapshotProducer.java    |   8 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |  20 +-
 .../main/java/org/apache/iceberg/V1Metadata.java   |  10 +-
 .../main/java/org/apache/iceberg/V2Metadata.java   |   3 +-
 .../org/apache/iceberg/avro/AvroFileAppender.java  |   2 +-
 .../iceberg/encryption/AesGcmOutputStream.java     |   5 +
 .../apache/iceberg/encryption/EncryptedFiles.java  |   4 +
 .../org/apache/iceberg/TestManifestEncryption.java | 247 +++++++++++++++++++++
 13 files changed, 358 insertions(+), 29 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java 
b/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
index b80549fbde..5092863809 100644
--- a/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
+++ b/api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
@@ -29,4 +29,15 @@ public abstract class PositionOutputStream extends 
OutputStream {
    * @throws IOException If the underlying stream throws IOException
    */
   public abstract long getPos() throws IOException;
+
+  /**
+   * Return the current stored length of the output. Can differ from the 
current position for
+   * encrypting streams, and for other non-length-preserving streams.
+   *
+   * @return current stored length in bytes
+   * @throws IOException If the underlying stream throws IOException
+   */
+  public long storedLength() throws IOException {
+    return getPos();
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java 
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index c70dda2bd6..e8fbfef2ca 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -32,10 +32,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,8 +154,8 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     TableMetadata current = ops.current();
-    InputFile toCopy = ops.io().newInputFile(manifest.path());
-    OutputFile newFile = newManifestOutput();
+    InputFile toCopy = ops.io().newInputFile(manifest);
+    EncryptedOutputFile newFile = newManifestOutputFile();
     return ManifestFiles.copyRewriteManifest(
         current.formatVersion(),
         manifest.partitionSpecId(),
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index e0919d9c7b..14e776a92d 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -121,14 +121,14 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     TableMetadata current = ops.current();
-    InputFile toCopy = ops.io().newInputFile(manifest.path());
-    OutputFile newManifestPath = newManifestOutput();
+    InputFile toCopy = ops.io().newInputFile(manifest);
+    EncryptedOutputFile newManifestFile = newManifestOutputFile();
     return ManifestFiles.copyAppendManifest(
         current.formatVersion(),
         manifest.partitionSpecId(),
         toCopy,
         current.specsById(),
-        newManifestPath,
+        newManifestFile,
         snapshotId(),
         summaryBuilder);
   }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index f7b1add6bd..9009f19ec9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.avro.AvroEncoderUtil;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.ContentCache;
@@ -157,11 +159,29 @@ public class ManifestFiles {
    */
   public static ManifestWriter<DataFile> write(
       int formatVersion, PartitionSpec spec, OutputFile outputFile, Long 
snapshotId) {
+    return write(
+        formatVersion, spec, 
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
+  }
+
+  /**
+   * Create a new {@link ManifestWriter} for the given format version.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param encryptedOutputFile an {@link EncryptedOutputFile} where the 
manifest will be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DataFile> write(
+      int formatVersion,
+      PartitionSpec spec,
+      EncryptedOutputFile encryptedOutputFile,
+      Long snapshotId) {
     switch (formatVersion) {
       case 1:
-        return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
+        return new ManifestWriter.V1Writer(spec, encryptedOutputFile, 
snapshotId);
       case 2:
-        return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
+        return new ManifestWriter.V2Writer(spec, encryptedOutputFile, 
snapshotId);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest for table version: " + formatVersion);
@@ -198,6 +218,21 @@ public class ManifestFiles {
    */
   public static ManifestWriter<DeleteFile> writeDeleteManifest(
       int formatVersion, PartitionSpec spec, OutputFile outputFile, Long 
snapshotId) {
+    return writeDeleteManifest(
+        formatVersion, spec, 
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
+  }
+
+  /**
+   * Create a new {@link ManifestWriter} for the given format version.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param outputFile an {@link EncryptedOutputFile} where the manifest will 
be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DeleteFile> writeDeleteManifest(
+      int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile, 
Long snapshotId) {
     switch (formatVersion) {
       case 1:
         throw new IllegalArgumentException("Cannot write delete files in a v1 
table");
@@ -254,7 +289,7 @@ public class ManifestFiles {
       int specId,
       InputFile toCopy,
       Map<Integer, PartitionSpec> specsById,
-      OutputFile outputFile,
+      EncryptedOutputFile outputFile,
       long snapshotId,
       SnapshotSummary.Builder summaryBuilder) {
     // use metadata that will add the current snapshot's ID for the rewrite
@@ -278,7 +313,7 @@ public class ManifestFiles {
       int specId,
       InputFile toCopy,
       Map<Integer, PartitionSpec> specsById,
-      OutputFile outputFile,
+      EncryptedOutputFile outputFile,
       long snapshotId,
       SnapshotSummary.Builder summaryBuilder) {
     // for a rewritten manifest all snapshot ids should be set. use empty 
metadata to throw an
@@ -302,7 +337,7 @@ public class ManifestFiles {
   private static ManifestFile copyManifestInternal(
       int formatVersion,
       ManifestReader<DataFile> reader,
-      OutputFile outputFile,
+      EncryptedOutputFile outputFile,
       long snapshotId,
       SnapshotSummary.Builder summaryBuilder,
       ManifestEntry.Status allowedEntryStatus) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 4865ccfc3b..cea907ddac 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -19,7 +19,9 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
@@ -37,6 +39,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static final long UNASSIGNED_SEQ = -1L;
 
   private final OutputFile file;
+  private final ByteBuffer keyMetadataBuffer;
   private final int specId;
   private final FileAppender<ManifestEntry<F>> writer;
   private final Long snapshotId;
@@ -52,13 +55,14 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   private long deletedRows = 0L;
   private Long minDataSequenceNumber = null;
 
-  private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) 
{
-    this.file = file;
+  private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
+    this.file = file.encryptingOutputFile();
     this.specId = spec.specId();
-    this.writer = newAppender(spec, file);
+    this.writer = newAppender(spec, this.file);
     this.snapshotId = snapshotId;
     this.reused = new GenericManifestEntry<>(spec.partitionType());
     this.stats = new PartitionSummary(spec);
+    this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : 
file.keyMetadata().buffer();
   }
 
   protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
@@ -204,7 +208,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
         deletedFiles,
         deletedRows,
         stats.summaries(),
-        null);
+        keyMetadataBuffer);
   }
 
   @Override
@@ -216,7 +220,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V2Writer extends ManifestWriter<DataFile> {
     private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;
 
-    V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+    V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
       super(spec, file, snapshotId);
       this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, 
spec.partitionType());
     }
@@ -250,7 +254,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
     private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;
 
-    V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+    V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
       super(spec, file, snapshotId);
       this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, 
spec.partitionType());
     }
@@ -289,7 +293,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V1Writer extends ManifestWriter<DataFile> {
     private final V1Metadata.IndexedManifestEntry entryWrapper;
 
-    V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+    V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
       super(spec, file, snapshotId);
       this.entryWrapper = new 
V1Metadata.IndexedManifestEntry(spec.partitionType());
     }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 5d3ec6e35f..c1dc6b58b7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -38,7 +39,6 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Predicate;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -275,14 +275,14 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
   private ManifestFile copyManifest(ManifestFile manifest) {
     TableMetadata current = ops.current();
-    InputFile toCopy = ops.io().newInputFile(manifest.path());
-    OutputFile newManifestPath = newManifestOutput();
+    InputFile toCopy = ops.io().newInputFile(manifest);
+    EncryptedOutputFile newManifestFile = newManifestOutputFile();
     return ManifestFiles.copyAppendManifest(
         current.formatVersion(),
         manifest.partitionSpecId(),
         toCopy,
         current.specsById(),
-        newManifestPath,
+        newManifestFile,
         snapshotId(),
         appendedManifestsSummary);
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 757d0b78bc..1c4491a684 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -43,6 +43,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.exceptions.CleanableFailure;
@@ -255,7 +257,6 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
           .run(index -> manifestFiles[index] = 
manifestsWithMetadata.get(manifests.get(index)));
 
       writer.addAll(Arrays.asList(manifestFiles));
-
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to write manifest list file");
     }
@@ -499,6 +500,11 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
                         "snap-%d-%d-%s", snapshotId(), 
attempt.incrementAndGet(), commitUUID))));
   }
 
+  /**
+   * @deprecated will be removed in 1.7.0; Use {@link 
SnapshotProducer#newManifestOutputFile}
+   *     instead
+   */
+  @Deprecated
   protected OutputFile newManifestOutput() {
     return ops.io()
         .newOutputFile(
@@ -506,14 +512,22 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
                 FileFormat.AVRO.addExtension(commitUUID + "-m" + 
manifestCount.getAndIncrement())));
   }
 
+  protected EncryptedOutputFile newManifestOutputFile() {
+    String manifestFileLocation =
+        ops.metadataFileLocation(
+            FileFormat.AVRO.addExtension(commitUUID + "-m" + 
manifestCount.getAndIncrement()));
+    return EncryptingFileIO.combine(ops.io(), ops.encryption())
+        .newEncryptingOutputFile(manifestFileLocation);
+  }
+
   protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
     return ManifestFiles.write(
-        ops.current().formatVersion(), spec, newManifestOutput(), 
snapshotId());
+        ops.current().formatVersion(), spec, newManifestOutputFile(), 
snapshotId());
   }
 
   protected ManifestWriter<DeleteFile> newDeleteManifestWriter(PartitionSpec 
spec) {
     return ManifestFiles.writeDeleteManifest(
-        ops.current().formatVersion(), spec, newManifestOutput(), 
snapshotId());
+        ops.current().formatVersion(), spec, newManifestOutputFile(), 
snapshotId());
   }
 
   protected RollingManifestWriter<DataFile> 
newRollingManifestWriter(PartitionSpec spec) {
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java 
b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index fddcee0374..30b04cd731 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -43,7 +43,8 @@ class V1Metadata {
           ManifestFile.PARTITION_SUMMARIES,
           ManifestFile.ADDED_ROWS_COUNT,
           ManifestFile.EXISTING_ROWS_COUNT,
-          ManifestFile.DELETED_ROWS_COUNT);
+          ManifestFile.DELETED_ROWS_COUNT,
+          ManifestFile.KEY_METADATA);
 
   /**
    * A wrapper class to write any ManifestFile implementation to Avro using 
the v1 schema.
@@ -97,6 +98,8 @@ class V1Metadata {
           return existingRowsCount();
         case 10:
           return deletedRowsCount();
+        case 11:
+          return keyMetadata();
         default:
           throw new UnsupportedOperationException("Unknown field ordinal: " + 
pos);
       }
@@ -187,6 +190,11 @@ class V1Metadata {
       return wrapped.partitions();
     }
 
+    @Override
+    public ByteBuffer keyMetadata() {
+      return wrapped.keyMetadata();
+    }
+
     @Override
     public ManifestFile copy() {
       return wrapped.copy();
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java 
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index ba6bb4a319..8f3b71d399 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -47,7 +47,8 @@ class V2Metadata {
           ManifestFile.ADDED_ROWS_COUNT.asRequired(),
           ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
           ManifestFile.DELETED_ROWS_COUNT.asRequired(),
-          ManifestFile.PARTITION_SUMMARIES);
+          ManifestFile.PARTITION_SUMMARIES,
+          ManifestFile.KEY_METADATA);
 
   /**
    * A wrapper class to write any ManifestFile implementation to Avro using 
the v2 write schema.
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
index 47e3c2c284..c69a8626f6 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
@@ -80,7 +80,7 @@ class AvroFileAppender<D> implements FileAppender<D> {
   public long length() {
     if (stream != null) {
       try {
-        return stream.getPos();
+        return stream.storedLength();
       } catch (IOException e) {
         throw new RuntimeIOException(e, "Failed to get stream length");
       }
diff --git 
a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java 
b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
index b4f723cca3..da437b7540 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
@@ -129,6 +129,11 @@ public class AesGcmOutputStream extends 
PositionOutputStream {
     targetStream.close();
   }
 
+  @Override
+  public long storedLength() throws IOException {
+    return targetStream.storedLength();
+  }
+
   private void writeHeader() throws IOException {
     targetStream.write(HEADER_BYTES);
     isHeaderWritten = true;
diff --git 
a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java 
b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
index c0fc41ca13..b0e2e74d93 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java
@@ -57,5 +57,9 @@ public class EncryptedFiles {
         encryptedOutputFile, 
BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
   }
 
+  public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile 
encryptingOutputFile) {
+    return new BaseEncryptedOutputFile(encryptingOutputFile, 
EncryptionKeyMetadata.EMPTY);
+  }
+
   private EncryptedFiles() {}
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java 
b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
new file mode 100644
index 0000000000..b64324ec57
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptingFileIO;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionTestHelpers;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestManifestEncryption {
+  private static final FileIO FILE_IO = new TestTables.LocalFileIO();
+
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.LongType.get()),
+          required(2, "timestamp", Types.TimestampType.withZone()),
+          required(3, "category", Types.StringType.get()),
+          required(4, "data", Types.StringType.get()),
+          required(5, "double", Types.DoubleType.get()));
+
+  private static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA)
+          .identity("category")
+          .hour("timestamp")
+          .bucket("id", 16)
+          .build();
+
+  private static final long SNAPSHOT_ID = 987134631982734L;
+  private static final String PATH =
+      
"s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro";
+  private static final FileFormat FORMAT = FileFormat.AVRO;
+  private static final PartitionData PARTITION =
+      DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3");
+  private static final Metrics METRICS =
+      new Metrics(
+          1587L,
+          ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // 
sizes
+          ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // 
value counts
+          ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value 
counts
+          ImmutableMap.of(5, 10L), // nan value counts
+          ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 
1)), // lower bounds
+          ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 
1))); // upper bounds
+  private static final List<Long> OFFSETS = ImmutableList.of(4L);
+  private static final Integer SORT_ORDER_ID = 2;
+
+  private static final ByteBuffer CONTENT_KEY_METADATA = 
ByteBuffer.allocate(100);
+
+  private static final DataFile DATA_FILE =
+      new GenericDataFile(
+          0,
+          PATH,
+          FORMAT,
+          PARTITION,
+          150972L,
+          METRICS,
+          CONTENT_KEY_METADATA,
+          OFFSETS,
+          SORT_ORDER_ID);
+
+  private static final List<Integer> EQUALITY_IDS = ImmutableList.of(1);
+  private static final int[] EQUALITY_ID_ARR = new int[] {1};
+
+  private static final DeleteFile DELETE_FILE =
+      new GenericDeleteFile(
+          SPEC.specId(),
+          FileContent.EQUALITY_DELETES,
+          PATH,
+          FORMAT,
+          PARTITION,
+          22905L,
+          METRICS,
+          EQUALITY_ID_ARR,
+          SORT_ORDER_ID,
+          null,
+          CONTENT_KEY_METADATA);
+
+  private static final EncryptionManager ENCRYPTION_MANAGER =
+      EncryptionTestHelpers.createEncryptionManager();
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testV1Write() throws IOException {
+    ManifestFile manifest = writeManifest(1);
+    checkEntry(
+        readManifest(manifest),
+        ManifestWriter.UNASSIGNED_SEQ,
+        ManifestWriter.UNASSIGNED_SEQ,
+        FileContent.DATA);
+  }
+
+  @Test
+  public void testV2Write() throws IOException {
+    ManifestFile manifest = writeManifest(2);
+    checkEntry(
+        readManifest(manifest),
+        ManifestWriter.UNASSIGNED_SEQ,
+        ManifestWriter.UNASSIGNED_SEQ,
+        FileContent.DATA);
+  }
+
+  @Test
+  public void testV2WriteDelete() throws IOException {
+    ManifestFile manifest = writeDeleteManifest(2);
+    checkEntry(
+        readDeleteManifest(manifest),
+        ManifestWriter.UNASSIGNED_SEQ,
+        ManifestWriter.UNASSIGNED_SEQ,
+        FileContent.EQUALITY_DELETES);
+  }
+
+  void checkEntry(
+      ManifestEntry<?> entry,
+      Long expectedDataSequenceNumber,
+      Long expectedFileSequenceNumber,
+      FileContent content) {
+    Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status());
+    Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
+    Assert.assertEquals(
+        "Data sequence number", expectedDataSequenceNumber, 
entry.dataSequenceNumber());
+    Assert.assertEquals(
+        "File sequence number", expectedFileSequenceNumber, 
entry.fileSequenceNumber());
+    checkDataFile(entry.file(), content);
+  }
+
+  void checkDataFile(ContentFile<?> dataFile, FileContent content) {
+    // DataFile is the superclass of DeleteFile, so this method can check both
+    Assert.assertEquals("Content", content, dataFile.content());
+    Assert.assertEquals("Path", PATH, dataFile.path());
+    Assert.assertEquals("Format", FORMAT, dataFile.format());
+    Assert.assertEquals("Partition", PARTITION, dataFile.partition());
+    Assert.assertEquals("Record count", METRICS.recordCount(), (Long) 
dataFile.recordCount());
+    Assert.assertEquals("Column sizes", METRICS.columnSizes(), 
dataFile.columnSizes());
+    Assert.assertEquals("Value counts", METRICS.valueCounts(), 
dataFile.valueCounts());
+    Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), 
dataFile.nullValueCounts());
+    Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(), 
dataFile.nanValueCounts());
+    Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), 
dataFile.lowerBounds());
+    Assert.assertEquals("Upper bounds", METRICS.upperBounds(), 
dataFile.upperBounds());
+    Assert.assertEquals("Sort order id", SORT_ORDER_ID, 
dataFile.sortOrderId());
+    if (dataFile.content() == FileContent.EQUALITY_DELETES) {
+      Assert.assertEquals(EQUALITY_IDS, dataFile.equalityFieldIds());
+    } else {
+      Assert.assertNull(dataFile.equalityFieldIds());
+    }
+  }
+
+  private ManifestFile writeManifest(int formatVersion) throws IOException {
+    return writeManifest(DATA_FILE, formatVersion);
+  }
+
+  private ManifestFile writeManifest(DataFile file, int formatVersion) throws 
IOException {
+    OutputFile manifestFile =
+        
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+    EncryptedOutputFile encryptedManifest = 
ENCRYPTION_MANAGER.encrypt(manifestFile);
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(formatVersion, SPEC, encryptedManifest, 
SNAPSHOT_ID);
+    try {
+      writer.add(file);
+    } finally {
+      writer.close();
+    }
+    return writer.toManifestFile();
+  }
+
+  private ManifestEntry<DataFile> readManifest(ManifestFile manifest) throws 
IOException {
+    // First try to read without decryption
+    Assertions.assertThatThrownBy(
+            () ->
+                ManifestFiles.read(
+                    manifest,
+                    EncryptingFileIO.combine(FILE_IO, 
PlaintextEncryptionManager.instance()),
+                    null))
+        .isInstanceOf(RuntimeIOException.class)
+        .hasMessageContaining("Failed to open file")
+        .hasCauseInstanceOf(InvalidAvroMagicException.class);
+
+    try (CloseableIterable<ManifestEntry<DataFile>> reader =
+        ManifestFiles.read(manifest, EncryptingFileIO.combine(FILE_IO, 
ENCRYPTION_MANAGER), null)
+            .entries()) {
+      List<ManifestEntry<DataFile>> files = Lists.newArrayList(reader);
+      Assert.assertEquals("Should contain only one data file", 1, 
files.size());
+      return files.get(0);
+    }
+  }
+
+  private ManifestFile writeDeleteManifest(int formatVersion) throws 
IOException {
+    OutputFile manifestFile =
+        
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+    EncryptedOutputFile encryptedManifest = 
ENCRYPTION_MANAGER.encrypt(manifestFile);
+    ManifestWriter<DeleteFile> writer =
+        ManifestFiles.writeDeleteManifest(formatVersion, SPEC, 
encryptedManifest, SNAPSHOT_ID);
+    try {
+      writer.add(DELETE_FILE);
+    } finally {
+      writer.close();
+    }
+    return writer.toManifestFile();
+  }
+
+  private ManifestEntry<DeleteFile> readDeleteManifest(ManifestFile manifest) 
throws IOException {
+    try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+        ManifestFiles.readDeleteManifest(
+                manifest, EncryptingFileIO.combine(FILE_IO, 
ENCRYPTION_MANAGER), null)
+            .entries()) {
+      List<ManifestEntry<DeleteFile>> entries = Lists.newArrayList(reader);
+      Assert.assertEquals("Should contain only one delete file", 1, 
entries.size());
+      return entries.get(0);
+    }
+  }
+}

Reply via email to