This is an automated email from the ASF dual-hosted git repository.

blackmwk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ca2e6e336 feat(encryption) [5/N] Support encryption: Encryption 
Manager (#2383)
ca2e6e336 is described below

commit ca2e6e3365c32319f6172d4676c40f35fa039710
Author: Xander <[email protected]>
AuthorDate: Wed May 13 09:31:14 2026 +0100

    feat(encryption) [5/N] Support encryption: Encryption Manager (#2383)
    
    ## Which issue does this PR close?
    
    Part of https://github.com/apache/iceberg-rust/issues/2034
    
    ## What changes are included in this PR?
    
    Stacked on #2340. Adds `EncryptionManager` — handles two-layer envelope
    encryption (master key → KEK → DEK) for Iceberg tables.
    
    ### New files
    
    - **`manager.rs`** — `EncryptionManager` with `typed_builder`
    construction:
    - `encrypt()` — wraps an `OutputFile` in an `EncryptedOutputFile` with a
    freshly generated DEK + AAD prefix
    - `wrap_key_metadata()` / `unwrap_key_metadata()` — KEK envelope
    wrap/unwrap for manifest list key metadata
    - KEK lifecycle: creation, rotation (730-day NIST SP 800-57 lifespan),
    caching (1-hour TTL via moka)
    - Builder exposes both `encryption_keys(HashMap)` (bulk, for production
    load from `TableMetadata`) and `add_encryption_key(EncryptedKey)`
    (one-at-a-time, ergonomic for tests)
    - **`io.rs`** — `EncryptedInputFile` / `EncryptedOutputFile` wrappers
    that hold a `StandardKeyMetadata` and lazily build the AGS1 stream
    cipher on `reader()` / `writer()`
    
    ### Design decisions
    
    - **No `NativeEncryptedInputFile` / `NativeEncryptedOutputFile`
    wrappers** — For PME, `StandardKeyMetadata` already carries the DEK and
    AAD prefix. Wrapper types that just bundle an `InputFile`/`OutputFile`
    with the same data are unnecessary indirection.
    - **No `decrypt()` method on the manager** — it had no manager state to
    use; callers do `StandardKeyMetadata::decode(bytes)` +
    `EncryptedInputFile::new(input, metadata)` directly. Mirrors the fact
    that Java's `decrypt()` is also a thin factory unrelated to manager
    state.
    - **No `AesGcmFileDecryptor` / `AesGcmFileEncryptor`** — the encrypted
    file types store `StandardKeyMetadata` directly and construct the
    underlying `AesGcmFileRead` / `AesGcmFileWrite` on demand. One fewer
    layer.
    - **KEK timestamp AAD is required** — missing/tampered timestamps fail
    with a clear `DataInvalid` error rather than silently passing `None` AAD
    (which would weaken the tampering defense).
    
    ### How this differs from Java's `StandardEncryptionManager`
    
    - **KEK management is explicit**: Java's `addManifestListKeyMetadata()`
    mutates an internal map and callers need to downcast to
    `StandardEncryptionManager` to access the keys. Our
    `wrap_key_metadata()` returns `(wrapped_key, Option<new_kek>)` directly
    — no hidden mutation, no downcasting. [Java
    
reference](https://github.com/apache/iceberg/blame/main/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java#L146-L151).
    - **No `EncryptionUtil` grab-bag**: Java needs a static utility class
    (`decryptManifestListKeyMetadata`, `encryptManifestListKeyMetadata`,
    etc.) because the interface is too narrow. Here those are just methods
    on `EncryptionManager`.
    - **`table_key_id` enforced at compile time**: required via
    `typed_builder`, can't forget it. Unencrypted tables use
    `Option<EncryptionManager>` instead of Java's
    `PlaintextEncryptionManager`.
    - **Config values match Java exactly**: KEK lifespan (730 days), cache
    TTL (1 hour), AAD prefix length (16 bytes) are all hardcoded.
    
    ### Usage notes (for follow-up PRs that wire this in)
    
    `load_manifest_list` will do something like:
    ```rust
    let unwrapped_km = em
        .unwrap_key_metadata(encrypted_key, 
table_metadata.encryption_keys_map())
        .await?;
    let metadata = StandardKeyMetadata::decode(&unwrapped_km)?;
    let input = file_io.new_input(&self.manifest_list)?;
    EncryptedInputFile::new(input, metadata).read().await?
    ```
    
    ## Are these changes tested?
    Yes — 11 manager tests covering KEK creation/rotation/caching,
    wrap/unwrap roundtrips, AAD tampering and missing-timestamp rejection,
    plus a full encrypt-then-read roundtrip via `EncryptedInputFile`.
---
 crates/iceberg/src/encryption/crypto.rs         |   9 +
 crates/iceberg/src/encryption/file_decryptor.rs | 156 ------
 crates/iceberg/src/encryption/file_encryptor.rs | 138 -----
 crates/iceberg/src/encryption/io.rs             | 220 ++++++++
 crates/iceberg/src/encryption/manager.rs        | 671 ++++++++++++++++++++++++
 crates/iceberg/src/encryption/mod.rs            |   8 +-
 6 files changed, 904 insertions(+), 298 deletions(-)

diff --git a/crates/iceberg/src/encryption/crypto.rs 
b/crates/iceberg/src/encryption/crypto.rs
index 0f6a9eff4..02d085ebf 100644
--- a/crates/iceberg/src/encryption/crypto.rs
+++ b/crates/iceberg/src/encryption/crypto.rs
@@ -177,6 +177,15 @@ impl SecureKey {
     }
 }
 
+impl TryFrom<SensitiveBytes> for SecureKey {
+    type Error = Error;
+
+    fn try_from(key: SensitiveBytes) -> Result<Self> {
+        let key_size = AesKeySize::from_key_length(key.len())?;
+        Ok(Self { key, key_size })
+    }
+}
+
 /// AES-GCM cipher for encrypting and decrypting data.
 pub struct AesGcmCipher {
     key: SensitiveBytes,
diff --git a/crates/iceberg/src/encryption/file_decryptor.rs 
b/crates/iceberg/src/encryption/file_decryptor.rs
deleted file mode 100644
index e44c0e1d7..000000000
--- a/crates/iceberg/src/encryption/file_decryptor.rs
+++ /dev/null
@@ -1,156 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! File-level decryption helper for AGS1 stream-encrypted files.
-
-use std::fmt;
-use std::sync::Arc;
-
-use super::crypto::{AesGcmCipher, SecureKey};
-use super::stream::AesGcmFileRead;
-use crate::Result;
-use crate::io::FileRead;
-
-/// Holds the decryption material for a single encrypted file.
-///
-/// Created from a plaintext DEK and AAD prefix, then used to wrap
-/// an encrypted file reader for transparent decryption on read.
-pub struct AesGcmFileDecryptor {
-    cipher: Arc<AesGcmCipher>,
-    aad_prefix: Box<[u8]>,
-}
-
-impl AesGcmFileDecryptor {
-    /// Creates a new `AesGcmFileDecryptor` from a plaintext DEK and AAD 
prefix.
-    pub fn new(dek: &[u8], aad_prefix: impl Into<Box<[u8]>>) -> Result<Self> {
-        let key = SecureKey::new(dek)?;
-        let cipher = Arc::new(AesGcmCipher::new(key));
-        Ok(Self {
-            cipher,
-            aad_prefix: aad_prefix.into(),
-        })
-    }
-
-    /// Wraps a raw encrypted-file reader in a decrypting [`AesGcmFileRead`].
-    pub fn wrap_reader(
-        &self,
-        reader: Box<dyn FileRead>,
-        encrypted_file_length: u64,
-    ) -> Result<Box<dyn FileRead>> {
-        let decrypting = AesGcmFileRead::new(
-            reader,
-            Arc::clone(&self.cipher),
-            self.aad_prefix.clone(),
-            encrypted_file_length,
-        )?;
-        Ok(Box::new(decrypting))
-    }
-
-    /// Calculates the plaintext length from an encrypted file's total length.
-    pub fn plaintext_length(&self, encrypted_file_length: u64) -> Result<u64> {
-        AesGcmFileRead::calculate_plaintext_length(encrypted_file_length)
-    }
-}
-
-impl fmt::Debug for AesGcmFileDecryptor {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("AesGcmFileDecryptor")
-            .field("aad_prefix_len", &self.aad_prefix.len())
-            .finish_non_exhaustive()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::ops::Range;
-
-    use bytes::Bytes;
-
-    use super::*;
-    use crate::encryption::AesGcmFileEncryptor;
-    use crate::io::FileWrite;
-
-    struct MemoryFileRead(Bytes);
-
-    #[async_trait::async_trait]
-    impl FileRead for MemoryFileRead {
-        async fn read(&self, range: Range<u64>) -> Result<Bytes> {
-            Ok(self.0.slice(range.start as usize..range.end as usize))
-        }
-    }
-
-    struct MemoryFileWrite {
-        buffer: std::sync::Arc<std::sync::Mutex<Vec<u8>>>,
-    }
-
-    #[async_trait::async_trait]
-    impl FileWrite for MemoryFileWrite {
-        async fn write(&mut self, bs: Bytes) -> Result<()> {
-            self.buffer.lock().unwrap().extend_from_slice(&bs);
-            Ok(())
-        }
-
-        async fn close(&mut self) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_wrap_reader_roundtrip() {
-        let key = b"0123456789abcdef";
-        let aad_prefix = b"test-aad-prefix!";
-        let plaintext = b"Hello from file decryptor!";
-
-        // Encrypt via the encryptor wrapper
-        let encryptor = AesGcmFileEncryptor::new(key.as_slice(), 
aad_prefix.as_slice()).unwrap();
-        let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
-        let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite {
-            buffer: buffer.clone(),
-        }));
-        writer.write(Bytes::from(plaintext.to_vec())).await.unwrap();
-        writer.close().await.unwrap();
-        let encrypted = buffer.lock().unwrap().clone();
-        let encrypted_len = encrypted.len() as u64;
-
-        // Decrypt via the decryptor wrapper
-        let decryptor = AesGcmFileDecryptor::new(key.as_slice(), 
aad_prefix.as_slice()).unwrap();
-        let reader = decryptor
-            .wrap_reader(
-                Box::new(MemoryFileRead(Bytes::from(encrypted))),
-                encrypted_len,
-            )
-            .unwrap();
-
-        let result = reader.read(0..plaintext.len() as u64).await.unwrap();
-        assert_eq!(&result[..], plaintext);
-    }
-
-    #[tokio::test]
-    async fn test_invalid_key_length() {
-        let result = AesGcmFileDecryptor::new(b"too-short", b"aad".as_slice());
-        assert!(result.is_err());
-    }
-
-    #[tokio::test]
-    async fn test_plaintext_length() {
-        let decryptor = AesGcmFileDecryptor::new(b"0123456789abcdef", 
b"aad".as_slice()).unwrap();
-        // header(8) + nonce(12) + 10 bytes ciphertext + tag(16) = 46
-        let encrypted_len = 8 + 12 + 10 + 16;
-        let plain_len = decryptor.plaintext_length(encrypted_len).unwrap();
-        assert_eq!(plain_len, 10);
-    }
-}
diff --git a/crates/iceberg/src/encryption/file_encryptor.rs 
b/crates/iceberg/src/encryption/file_encryptor.rs
deleted file mode 100644
index 773438ad8..000000000
--- a/crates/iceberg/src/encryption/file_encryptor.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! File-level encryption helper for AGS1 stream-encrypted files.
-
-use std::fmt;
-use std::sync::Arc;
-
-use super::crypto::{AesGcmCipher, SecureKey};
-use super::stream::AesGcmFileWrite;
-use crate::Result;
-use crate::io::FileWrite;
-
-/// Holds the encryption material for a single encrypted file.
-///
-/// This is the write-side counterpart to
-/// [`AesGcmFileDecryptor`](super::AesGcmFileDecryptor). Created from
-/// a plaintext DEK and AAD prefix, then used to wrap an output writer
-/// for transparent encryption on write.
-pub struct AesGcmFileEncryptor {
-    cipher: Arc<AesGcmCipher>,
-    aad_prefix: Box<[u8]>,
-}
-
-impl AesGcmFileEncryptor {
-    /// Creates a new `AesGcmFileEncryptor` from a plaintext DEK and AAD 
prefix.
-    pub fn new(dek: &[u8], aad_prefix: impl Into<Box<[u8]>>) -> Result<Self> {
-        let key = SecureKey::new(dek)?;
-        let cipher = Arc::new(AesGcmCipher::new(key));
-        Ok(Self {
-            cipher,
-            aad_prefix: aad_prefix.into(),
-        })
-    }
-
-    /// Wraps a raw output writer in an encrypting [`AesGcmFileWrite`].
-    pub fn wrap_writer(&self, writer: Box<dyn FileWrite>) -> Box<dyn 
FileWrite> {
-        Box::new(AesGcmFileWrite::new(
-            writer,
-            Arc::clone(&self.cipher),
-            self.aad_prefix.clone(),
-        ))
-    }
-}
-
-impl fmt::Debug for AesGcmFileEncryptor {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("AesGcmFileEncryptor")
-            .field("aad_prefix_len", &self.aad_prefix.len())
-            .finish_non_exhaustive()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::ops::Range;
-
-    use bytes::Bytes;
-
-    use super::*;
-    use crate::encryption::AesGcmFileDecryptor;
-    use crate::io::FileRead;
-
-    struct MemoryFileRead(Bytes);
-
-    #[async_trait::async_trait]
-    impl FileRead for MemoryFileRead {
-        async fn read(&self, range: Range<u64>) -> Result<Bytes> {
-            Ok(self.0.slice(range.start as usize..range.end as usize))
-        }
-    }
-
-    struct MemoryFileWrite {
-        buffer: std::sync::Arc<std::sync::Mutex<Vec<u8>>>,
-    }
-
-    #[async_trait::async_trait]
-    impl FileWrite for MemoryFileWrite {
-        async fn write(&mut self, bs: Bytes) -> Result<()> {
-            self.buffer.lock().unwrap().extend_from_slice(&bs);
-            Ok(())
-        }
-
-        async fn close(&mut self) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_wrap_writer_roundtrip() {
-        let key = b"0123456789abcdef";
-        let aad_prefix = b"test-aad-prefix!";
-        let plaintext = b"Hello from file encryptor!";
-
-        // Encrypt via the encryptor wrapper
-        let encryptor = AesGcmFileEncryptor::new(key.as_slice(), 
aad_prefix.as_slice()).unwrap();
-        let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
-        let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite {
-            buffer: buffer.clone(),
-        }));
-        writer.write(Bytes::from(plaintext.to_vec())).await.unwrap();
-        writer.close().await.unwrap();
-        let encrypted = buffer.lock().unwrap().clone();
-        let encrypted_len = encrypted.len() as u64;
-
-        // Decrypt via the decryptor wrapper
-        let decryptor = AesGcmFileDecryptor::new(key.as_slice(), 
aad_prefix.as_slice()).unwrap();
-        let reader = decryptor
-            .wrap_reader(
-                Box::new(MemoryFileRead(Bytes::from(encrypted))),
-                encrypted_len,
-            )
-            .unwrap();
-
-        let result = reader.read(0..plaintext.len() as u64).await.unwrap();
-        assert_eq!(&result[..], plaintext);
-    }
-
-    #[tokio::test]
-    async fn test_invalid_key_length() {
-        let result = AesGcmFileEncryptor::new(b"bad-key", b"aad".as_slice());
-        assert!(result.is_err());
-    }
-}
diff --git a/crates/iceberg/src/encryption/io.rs 
b/crates/iceberg/src/encryption/io.rs
new file mode 100644
index 000000000..c3d81dd85
--- /dev/null
+++ b/crates/iceberg/src/encryption/io.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Encrypted file wrappers for InputFile / OutputFile.
+
+use std::sync::Arc;
+
+use bytes::Bytes;
+
+use super::crypto::{AesGcmCipher, SecureKey};
+use super::key_metadata::StandardKeyMetadata;
+use super::stream::{AesGcmFileRead, AesGcmFileWrite};
+use crate::Result;
+use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
+
+/// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`].
+///
+/// Transparently decrypts on read.
+pub struct EncryptedInputFile {
+    inner: InputFile,
+    key_metadata: StandardKeyMetadata,
+}
+
+impl EncryptedInputFile {
+    /// Creates a new encrypted input file.
+    pub fn new(inner: InputFile, key_metadata: StandardKeyMetadata) -> Self {
+        Self {
+            inner,
+            key_metadata,
+        }
+    }
+
+    /// Absolute path of the file.
+    pub fn location(&self) -> &str {
+        self.inner.location()
+    }
+
+    /// Check if file exists.
+    pub async fn exists(&self) -> Result<bool> {
+        self.inner.exists().await
+    }
+
+    /// Fetch and returns metadata of file.
+    ///
+    /// The returned size is the **plaintext** size.
+    pub async fn metadata(&self) -> Result<FileMetadata> {
+        let raw_meta = self.inner.metadata().await?;
+        let plaintext_size = 
AesGcmFileRead::calculate_plaintext_length(raw_meta.size)?;
+        Ok(FileMetadata {
+            size: plaintext_size,
+        })
+    }
+
+    /// Read and returns whole content of file (decrypted plaintext).
+    pub async fn read(&self) -> Result<Bytes> {
+        let meta = self.metadata().await?;
+        let reader = self.reader().await?;
+        reader.read(0..meta.size).await
+    }
+
+    /// Creates a reader that transparently decrypts on each read.
+    pub async fn reader(&self) -> Result<Box<dyn FileRead>> {
+        let raw_meta = self.inner.metadata().await?;
+        let raw_reader = self.inner.reader().await?;
+        let cipher = build_cipher(&self.key_metadata)?;
+        let aad_prefix: Box<[u8]> = 
self.key_metadata.aad_prefix().unwrap_or_default().into();
+        let decrypting = AesGcmFileRead::new(raw_reader, cipher, aad_prefix, 
raw_meta.size)?;
+        Ok(Box::new(decrypting))
+    }
+
+    /// Returns a reference to the file's key metadata.
+    pub fn key_metadata(&self) -> &StandardKeyMetadata {
+        &self.key_metadata
+    }
+
+    /// Consumes self and returns the underlying plain input file.
+    pub fn into_inner(self) -> InputFile {
+        self.inner
+    }
+}
+
+impl std::fmt::Debug for EncryptedInputFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("EncryptedInputFile")
+            .field("path", &self.inner.location())
+            .finish_non_exhaustive()
+    }
+}
+
+/// An AGS1 stream-encrypted output file wrapping a plain [`OutputFile`].
+///
+/// Transparently encrypts on write.
+pub struct EncryptedOutputFile {
+    inner: OutputFile,
+    key_metadata: StandardKeyMetadata,
+}
+
+impl EncryptedOutputFile {
+    /// Creates a new encrypted output file.
+    pub fn new(inner: OutputFile, key_metadata: StandardKeyMetadata) -> Self {
+        Self {
+            inner,
+            key_metadata,
+        }
+    }
+
+    /// Returns a reference to the file's key metadata.
+    pub fn key_metadata(&self) -> &StandardKeyMetadata {
+        &self.key_metadata
+    }
+
+    /// Absolute path of the file.
+    pub fn location(&self) -> &str {
+        self.inner.location()
+    }
+
+    /// Creates a writer that transparently encrypts on each write.
+    pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+        let raw_writer = self.inner.writer().await?;
+        let cipher = build_cipher(&self.key_metadata)?;
+        let aad_prefix: Box<[u8]> = 
self.key_metadata.aad_prefix().unwrap_or_default().into();
+        Ok(Box::new(AesGcmFileWrite::new(
+            raw_writer, cipher, aad_prefix,
+        )))
+    }
+
+    /// Write bytes to file (transparently encrypted).
+    pub async fn write(&self, bs: Bytes) -> Result<()> {
+        let mut writer = self.writer().await?;
+        writer.write(bs).await?;
+        writer.close().await
+    }
+
+    /// Deletes the underlying file.
+    pub async fn delete(&self) -> Result<()> {
+        self.inner.delete().await
+    }
+
+    /// Consumes self and returns the underlying plain output file.
+    pub fn into_inner(self) -> OutputFile {
+        self.inner
+    }
+}
+
+impl std::fmt::Debug for EncryptedOutputFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("EncryptedOutputFile")
+            .field("path", &self.inner.location())
+            .finish_non_exhaustive()
+    }
+}
+
+fn build_cipher(metadata: &StandardKeyMetadata) -> Result<Arc<AesGcmCipher>> {
+    let key = SecureKey::new(metadata.encryption_key().as_bytes())?;
+    Ok(Arc::new(AesGcmCipher::new(key)))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIO;
+
+    fn key_metadata() -> StandardKeyMetadata {
+        
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
+    }
+
+    #[tokio::test]
+    async fn test_write_read_roundtrip() {
+        let fileio = FileIO::new_with_memory();
+        let path = "memory:///test/io_roundtrip.bin";
+        let plaintext = b"Hello from EncryptedInputFile/EncryptedOutputFile!";
+
+        let output = 
EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata());
+        output.write(Bytes::from(plaintext.to_vec())).await.unwrap();
+
+        let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), 
key_metadata());
+        let content = input.read().await.unwrap();
+        assert_eq!(&content[..], plaintext);
+    }
+
+    #[tokio::test]
+    async fn test_metadata_returns_plaintext_size() {
+        let fileio = FileIO::new_with_memory();
+        let path = "memory:///test/io_metadata.bin";
+        let plaintext = b"some bytes to measure";
+
+        let output = 
EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata());
+        output.write(Bytes::from(plaintext.to_vec())).await.unwrap();
+
+        let raw_size = fileio
+            .new_input(path)
+            .unwrap()
+            .metadata()
+            .await
+            .unwrap()
+            .size;
+        assert!(
+            raw_size > plaintext.len() as u64,
+            "encrypted file should be larger than plaintext (header + nonce + 
tag)"
+        );
+
+        let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), 
key_metadata());
+        let meta = input.metadata().await.unwrap();
+        assert_eq!(meta.size, plaintext.len() as u64);
+    }
+}
diff --git a/crates/iceberg/src/encryption/manager.rs 
b/crates/iceberg/src/encryption/manager.rs
new file mode 100644
index 000000000..a4c5b9c64
--- /dev/null
+++ b/crates/iceberg/src/encryption/manager.rs
@@ -0,0 +1,671 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Encryption manager for file-level encryption and two-layer envelope key 
management.
+//!
+//! [`EncryptionManager`] provides file-level `decrypt` / `encrypt`
+//! operations matching Java's 
`org.apache.iceberg.encryption.EncryptionManager`,
+//! using envelope encryption:
+//! - A master key (in KMS) wraps a Key Encryption Key (KEK)
+//! - The KEK wraps Data Encryption Keys (DEKs) locally
+
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
+
+use aes_gcm::aead::OsRng;
+use aes_gcm::aead::rand_core::RngCore;
+use chrono::Utc;
+use moka::future::Cache;
+use uuid::Uuid;
+
+const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000;
+
+use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
+use super::io::EncryptedOutputFile;
+use super::key_metadata::StandardKeyMetadata;
+use super::kms::KeyManagementClient;
+use crate::io::OutputFile;
+use crate::spec::EncryptedKey;
+use crate::{Error, ErrorKind, Result};
+
+/// Property key for the KEK creation timestamp (milliseconds since epoch).
+/// Matches Java's `StandardEncryptionManager.KEY_TIMESTAMP`.
+pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP";
+
+/// Default KEK lifespan in days, per NIST SP 800-57.
+const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730;
+
+/// Default cache TTL for unwrapped KEKs.
+const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600);
+
+/// Default AAD prefix length in bytes.
+/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`.
+const AAD_PREFIX_LENGTH: usize = 16;
+
+/// File-level encryption manager using two-layer envelope encryption.
+///
+/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls.
+#[derive(typed_builder::TypedBuilder)]
+#[builder(mutators(
+    /// Add an encryption key (KEK or wrapped key metadata entry).
+    pub fn add_encryption_key(&mut self, key: EncryptedKey) {
+        self.encryption_keys
+            .write()
+            .expect("encryption_keys lock poisoned")
+            .insert(key.key_id().to_string(), key);
+    }
+    /// Set all encryption keys from table metadata.
+    pub fn encryption_keys(&mut self, keys: HashMap<String, EncryptedKey>) {
+        self.encryption_keys = RwLock::new(keys);
+    }
+))]
+pub struct EncryptionManager {
+    kms_client: Arc<dyn KeyManagementClient>,
+    #[builder(
+        default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(),
+        setter(skip)
+    )]
+    kek_cache: Cache<String, SensitiveBytes>,
+    /// AES key size for DEK generation. Defaults to 128-bit.
+    #[builder(default = AesKeySize::default())]
+    key_size: AesKeySize,
+    /// Master key ID from table property `encryption.key-id`.
+    #[builder(setter(into))]
+    table_key_id: String,
+    /// All encryption keys from table metadata (KEKs and wrapped key metadata 
entries).
+    /// Newly created KEKs and wrapped manifest-list entries are inserted here 
so
+    /// callers can snapshot the full set at commit time via 
[`EncryptionManager::encryption_keys`].
+    #[builder(default = RwLock::new(HashMap::new()), via_mutators)]
+    encryption_keys: RwLock<HashMap<String, EncryptedKey>>,
+}
+
+impl fmt::Debug for EncryptionManager {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("EncryptionManager")
+            .field("key_size", &self.key_size)
+            .field("table_key_id", &self.table_key_id)
+            .finish_non_exhaustive()
+    }
+}
+
+impl EncryptionManager {
+    /// Encrypt a file with AGS1 stream encryption.
+    ///
+    /// Returns an [`EncryptedOutputFile`] that transparently encrypts on
+    /// write, along with key metadata for later decryption.
+    pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile {
+        let dek = SecureKey::generate(self.key_size);
+        let aad_prefix = Self::generate_aad_prefix();
+        let metadata = 
StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix);
+        EncryptedOutputFile::new(raw_output, metadata)
+    }
+
+    /// Wrap a manifest list key metadata with a KEK for storage in table 
metadata.
+    ///
+    /// Stores the resulting wrapped entry (and any newly created KEK) in the
+    /// manager's internal `encryption_keys` map. Callers persist the full set
+    /// at commit time via [`Self::encryption_keys`].
+    ///
+    /// Returns the `key_id` of the wrapped entry, which should be recorded on
+    /// the snapshot as `encryption_key_id` so readers can locate it later.
+    pub async fn encrypt_manifest_list_key_metadata(
+        &self,
+        key_metadata: &StandardKeyMetadata,
+    ) -> Result<String> {
+        let kek = match self.find_active_kek()? {
+            Some(existing) => existing,
+            None => self.create_kek().await?,
+        };
+
+        let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
+
+        // Use the KEK timestamp as AAD to prevent timestamp tampering attacks.
+        let aad = Self::kek_timestamp_aad(&kek)?;
+        let serialized = key_metadata.encode()?;
+        let wrapped_metadata = self.wrap_dek_with_kek(&serialized, &kek_bytes, 
Some(aad))?;
+
+        let wrapped_key = EncryptedKey::builder()
+            .key_id(Uuid::new_v4().to_string())
+            .encrypted_key_metadata(wrapped_metadata)
+            .encrypted_by_id(kek.key_id())
+            .build();
+
+        let wrapped_key_id = wrapped_key.key_id().to_string();
+        self.insert_encryption_key(wrapped_key);
+        Ok(wrapped_key_id)
+    }
+
+    /// Decrypt a manifest list key metadata previously wrapped via
+    /// [`Self::encrypt_manifest_list_key_metadata`].
+    ///
+    /// Looks up the entry by `encryption_key_id` (typically read from the
+    /// snapshot) in the manager's `encryption_keys` map.
+    pub async fn decrypt_manifest_list_key_metadata(
+        &self,
+        encryption_key_id: &str,
+    ) -> Result<StandardKeyMetadata> {
+        let encrypted_key = self
+            .encryption_keys
+            .read()
+            .expect("encryption_keys lock poisoned")
+            .get(encryption_key_id)
+            .cloned()
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Encryption key '{encryption_key_id}' not found"),
+                )
+            })?;
+
+        let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "EncryptedKey '{}' has no encrypted_by_id",
+                    encrypted_key.key_id()
+                ),
+            )
+        })?;
+
+        let bytes = self
+            .decrypt_dek(kek_key_id, encrypted_key.encrypted_key_metadata())
+            .await?;
+
+        StandardKeyMetadata::decode(bytes.as_bytes())
+    }
+
+    /// Borrow the encryption keys held by this manager.
+    ///
+    /// Use at commit time to persist newly created KEKs and wrapped
+    /// manifest-list entries into `TableMetadata.encryption_keys`.
+    pub fn with_encryption_keys<F, R>(&self, f: F) -> R
+    where F: FnOnce(&HashMap<String, EncryptedKey>) -> R {
+        let keys = self
+            .encryption_keys
+            .read()
+            .expect("encryption_keys lock poisoned");
+        f(&keys)
+    }
+
+    fn insert_encryption_key(&self, key: EncryptedKey) {
+        self.encryption_keys
+            .write()
+            .expect("encryption_keys lock poisoned")
+            .insert(key.key_id().to_string(), key);
+    }
+
+    /// Create a new KEK, wrapped by the table's master key, and store it in
+    /// the manager's `encryption_keys` map.
+    async fn create_kek(&self) -> Result<EncryptedKey> {
+        let (plaintext_kek, wrapped_kek) = if 
self.kms_client.supports_key_generation() {
+            let result = 
self.kms_client.generate_key(&self.table_key_id).await?;
+            (result.key().clone(), result.wrapped_key().to_vec())
+        } else {
+            let plaintext_key = SecureKey::generate(self.key_size);
+            let wrapped = self
+                .kms_client
+                .wrap_key(plaintext_key.as_bytes(), &self.table_key_id)
+                .await?;
+
+            (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped)
+        };
+
+        let key_id = Uuid::new_v4().to_string();
+        let now_ms = Utc::now().timestamp_millis();
+
+        let mut properties = HashMap::new();
+        properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), 
now_ms.to_string());
+
+        self.kek_cache.insert(key_id.clone(), plaintext_kek).await;
+
+        let kek = EncryptedKey::builder()
+            .key_id(key_id)
+            .encrypted_key_metadata(wrapped_kek)
+            .encrypted_by_id(&self.table_key_id)
+            .properties(properties)
+            .build();
+
+        self.insert_encryption_key(kek.clone());
+        Ok(kek)
+    }
+
+    /// Check whether a KEK has exceeded its configured lifespan (730 days per 
NIST SP 800-57).
+    fn is_kek_expired(&self, kek: &EncryptedKey) -> bool {
+        let created_at_ms = match kek
+            .properties()
+            .get(KEK_CREATED_AT_PROPERTY)
+            .and_then(|ts| ts.parse::<i64>().ok())
+        {
+            Some(ts) => ts,
+            None => return true, // No timestamp -> treat as expired
+        };
+
+        let now_ms = Utc::now().timestamp_millis();
+        let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY;
+        (now_ms - created_at_ms) >= lifespan_ms
+    }
+
+    /// Find the latest non-expired KEK for the table's master key.
+    fn find_active_kek(&self) -> Result<Option<EncryptedKey>> {
+        let keys = self
+            .encryption_keys
+            .read()
+            .expect("encryption_keys lock poisoned");
+        Ok(keys
+            .values()
+            .filter(|kek| {
+                kek.encrypted_by_id()
+                    .map(|id| id == self.table_key_id)
+                    .unwrap_or(false)
+                    && !self.is_kek_expired(kek)
+            })
+            .max_by_key(|kek| {
+                kek.properties()
+                    .get(KEK_CREATED_AT_PROPERTY)
+                    .and_then(|ts| ts.parse::<i64>().ok())
+                    .unwrap_or(0)
+            })
+            .cloned())
+    }
+
+    /// Unwrap a KEK using the KMS, with caching to avoid repeated calls.
+    async fn unwrap_key_encryption_key(&self, kek: &EncryptedKey) -> 
Result<SensitiveBytes> {
+        let cache_key = kek.key_id().to_string();
+
+        if let Some(cached) = self.kek_cache.get(&cache_key).await {
+            return Ok(cached);
+        }
+
+        let master_key_id = kek.encrypted_by_id().ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("KEK '{}' has no encrypted_by_id", kek.key_id()),
+            )
+        })?;
+
+        let plaintext = self
+            .kms_client
+            .unwrap_key(kek.encrypted_key_metadata(), master_key_id)
+            .await?;
+
+        self.kek_cache.insert(cache_key, plaintext.clone()).await;
+
+        Ok(plaintext)
+    }
+
+    /// Decrypt a wrapped DEK using the KEK identified by `kek_key_id`,
+    /// looked up in the manager's own `encryption_keys` map.
+    async fn decrypt_dek(&self, kek_key_id: &str, wrapped_dek: &[u8]) -> 
Result<SensitiveBytes> {
+        let kek = self
+            .encryption_keys
+            .read()
+            .expect("encryption_keys lock poisoned")
+            .get(kek_key_id)
+            .cloned()
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("KEK not found in encryption keys: {kek_key_id}"),
+                )
+            })?;
+
+        // KEK timestamp as AAD prevents timestamp tampering.
+        let aad = Self::kek_timestamp_aad(&kek)?;
+
+        let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
+        self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad))
+            .map_err(|e| {
+                Error::new(
+                    e.kind(),
+                    format!("Failed to unwrap key metadata with KEK 
'{kek_key_id}'"),
+                )
+                .with_source(e)
+            })
+    }
+
+    /// Extract the KEK timestamp for use as AAD. Returns an error if missing.
+    fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> {
+        kek.properties()
+            .get(KEK_CREATED_AT_PROPERTY)
+            .map(|ts| ts.as_bytes())
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "KEK '{}' is missing required '{}' property",
+                        kek.key_id(),
+                        KEK_CREATED_AT_PROPERTY
+                    ),
+                )
+            })
+    }
+
+    /// Generate a random AAD prefix for file encryption.
+    fn generate_aad_prefix() -> Box<[u8]> {
+        let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
+        OsRng.fill_bytes(&mut prefix);
+        prefix.into_boxed_slice()
+    }
+
+    /// Wrap a DEK with a KEK using local AES-GCM.
+    fn wrap_dek_with_kek(
+        &self,
+        dek: &[u8],
+        kek: &SensitiveBytes,
+        aad: Option<&[u8]>,
+    ) -> Result<Vec<u8>> {
+        let key = SecureKey::try_from(kek.clone())?;
+        let cipher = AesGcmCipher::new(key);
+        cipher.encrypt(dek, aad)
+    }
+
+    /// Unwrap a DEK with a KEK using local AES-GCM.
+    fn unwrap_dek_with_kek(
+        &self,
+        wrapped_dek: &[u8],
+        kek: &SensitiveBytes,
+        aad: Option<&[u8]>,
+    ) -> Result<SensitiveBytes> {
+        let key = SecureKey::try_from(kek.clone())?;
+        let cipher = AesGcmCipher::new(key);
+        cipher.decrypt(wrapped_dek, aad).map(SensitiveBytes::new)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::encryption::EncryptedInputFile;
+    use crate::encryption::kms::MemoryKeyManagementClient;
+
+    fn create_test_kms() -> Arc<dyn KeyManagementClient> {
+        let kms = MemoryKeyManagementClient::new();
+        kms.add_master_key("master-1").unwrap();
+        Arc::new(kms)
+    }
+
+    fn create_test_manager() -> EncryptionManager {
+        EncryptionManager::builder()
+            .kms_client(create_test_kms())
+            .table_key_id("master-1")
+            .build()
+    }
+
+    #[tokio::test]
+    async fn test_create_kek() {
+        let mgr = create_test_manager();
+        let kek = mgr.create_kek().await.unwrap();
+
+        assert!(!kek.key_id().is_empty());
+        assert!(!kek.encrypted_key_metadata().is_empty());
+        assert_eq!(kek.encrypted_by_id(), Some("master-1"));
+        assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY));
+    }
+
+    fn sample_key_metadata() -> StandardKeyMetadata {
+        
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
+    }
+
+    #[tokio::test]
+    async fn test_wrap_unwrap_key_metadata_roundtrip() {
+        let mgr = create_test_manager();
+        let plaintext = sample_key_metadata();
+
+        let key_id = mgr
+            .encrypt_manifest_list_key_metadata(&plaintext)
+            .await
+            .unwrap();
+
+        // First wrap should create a new KEK and the wrapped entry — both 
stored on the manager
+        assert_eq!(mgr.with_encryption_keys(|k| k.len()), 2);
+
+        let decrypted = mgr
+            .decrypt_manifest_list_key_metadata(&key_id)
+            .await
+            .unwrap();
+        assert_eq!(decrypted, plaintext);
+    }
+
+    #[tokio::test]
+    async fn test_kek_reuse_when_not_expired() {
+        let mgr = create_test_manager();
+
+        // First wrap creates a new KEK + wrapped entry (2 keys)
+        let _id1 = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+        let kek_id = mgr.with_encryption_keys(|keys| {
+            assert_eq!(keys.len(), 2);
+            keys.values()
+                .find(|k| k.encrypted_by_id() == Some("master-1"))
+                .unwrap()
+                .key_id()
+                .to_string()
+        });
+
+        // Second wrap should reuse the existing KEK (only adds 1 new wrapped 
entry)
+        let id2 = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+        let entry2 = mgr.with_encryption_keys(|keys| {
+            assert_eq!(keys.len(), 3);
+            keys.get(&id2).cloned().unwrap()
+        });
+        assert_eq!(entry2.encrypted_by_id(), Some(kek_id.as_str()));
+    }
+
+    #[tokio::test]
+    async fn test_kek_rotation_when_expired() {
+        let kms = create_test_kms();
+
+        // Create a KEK with a timestamp 3 years in the past (exceeds 730-day 
lifespan)
+        let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 * 
MILLIS_IN_DAY);
+        let mut properties = HashMap::new();
+        properties.insert(
+            KEK_CREATED_AT_PROPERTY.to_string(),
+            three_years_ago_ms.to_string(),
+        );
+
+        // Wrap a real KEK so unwrap works if needed
+        let kek_key = SecureKey::generate(AesKeySize::Bits128);
+        let wrapped = kms.wrap_key(kek_key.as_bytes(), 
"master-1").await.unwrap();
+
+        let old_kek = EncryptedKey::builder()
+            .key_id("expired-kek")
+            .encrypted_key_metadata(wrapped)
+            .encrypted_by_id("master-1")
+            .properties(properties)
+            .build();
+
+        // Build manager with the expired KEK
+        let mgr = EncryptionManager::builder()
+            .kms_client(kms)
+            .table_key_id("master-1")
+            .add_encryption_key(old_kek.clone())
+            .build();
+
+        // Wrap should rotate to a new KEK since the existing one is expired
+        let new_entry_id = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+        let entry = mgr
+            .with_encryption_keys(|keys| keys.get(&new_entry_id).cloned())
+            .unwrap();
+        let used_kek_id = entry.encrypted_by_id().unwrap();
+        assert_ne!(used_kek_id, old_kek.key_id());
+    }
+
+    #[tokio::test]
+    async fn test_is_kek_expired_no_timestamp() {
+        let mgr = create_test_manager();
+
+        // KEK without a created-at timestamp -> treated as expired
+        let kek = EncryptedKey::builder()
+            .key_id("no-ts")
+            .encrypted_key_metadata(vec![0u8; 32])
+            .build();
+
+        assert!(mgr.is_kek_expired(&kek));
+    }
+
+    #[tokio::test]
+    async fn test_decrypt_with_unknown_key_id() {
+        let mgr = create_test_manager();
+        let result = 
mgr.decrypt_manifest_list_key_metadata("nonexistent").await;
+        assert!(result.is_err());
+    }
+
+    #[tokio::test]
+    async fn test_kek_cache_hit() {
+        let mgr = create_test_manager();
+
+        // First wrap caches the plaintext KEK during create_kek().
+        let key_id = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+
+        // Decrypt unwraps the KEK; with the cache populated this should not 
hit KMS again.
+        let _ = mgr
+            .decrypt_manifest_list_key_metadata(&key_id)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_unwrap_fails_when_kek_missing_timestamp() {
+        let mgr = create_test_manager();
+
+        // Wrap some metadata to get a valid encrypted entry stored on the 
manager
+        let entry_id = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+
+        // Find the KEK that wrapped the entry and replace it with a copy that
+        // is missing the KEY_TIMESTAMP property, simulating a malformed table.
+        let mut keys = mgr.with_encryption_keys(|k| k.clone());
+        let kek_id = keys
+            .get(&entry_id)
+            .unwrap()
+            .encrypted_by_id()
+            .unwrap()
+            .to_string();
+        let kek = keys.remove(&kek_id).unwrap();
+        let kek_no_ts = EncryptedKey::builder()
+            .key_id(kek.key_id())
+            .encrypted_key_metadata(kek.encrypted_key_metadata())
+            .encrypted_by_id(kek.encrypted_by_id().unwrap())
+            .build();
+        keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts);
+
+        let mgr = EncryptionManager::builder()
+            .kms_client(create_test_kms())
+            .table_key_id("master-1")
+            .encryption_keys(keys)
+            .build();
+
+        let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert_eq!(err.kind(), ErrorKind::DataInvalid);
+        assert!(
+            err.to_string().contains(KEK_CREATED_AT_PROPERTY),
+            "error should mention the missing property: {err}"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_unwrap_fails_when_kek_timestamp_tampered() {
+        let mgr = create_test_manager();
+
+        // Wrap metadata normally
+        let entry_id = mgr
+            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+            .await
+            .unwrap();
+
+        // Tamper with the KEK timestamp (change the AAD)
+        let mut keys = mgr.with_encryption_keys(|k| k.clone());
+        let kek_id = keys
+            .get(&entry_id)
+            .unwrap()
+            .encrypted_by_id()
+            .unwrap()
+            .to_string();
+        let kek = keys.remove(&kek_id).unwrap();
+        let mut tampered_properties = kek.properties().clone();
+        tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), 
"9999999".to_string());
+        let tampered_kek = EncryptedKey::builder()
+            .key_id(kek.key_id())
+            .encrypted_key_metadata(kek.encrypted_key_metadata())
+            .encrypted_by_id(kek.encrypted_by_id().unwrap())
+            .properties(tampered_properties)
+            .build();
+        keys.insert(tampered_kek.key_id().to_string(), tampered_kek);
+
+        let mgr = EncryptionManager::builder()
+            .kms_client(create_test_kms())
+            .table_key_id("master-1")
+            .encryption_keys(keys)
+            .build();
+
+        // Unwrap should fail because the AAD (timestamp) doesn't match what 
was used to wrap
+        let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
+        assert!(
+            result.is_err(),
+            "tampered timestamp should cause decryption failure"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_encrypt_decrypt_roundtrip() {
+        use crate::io::FileIO;
+
+        let io = FileIO::new_with_memory();
+        let path = "memory:///test/encrypt_roundtrip.bin";
+
+        let kms = MemoryKeyManagementClient::new();
+        kms.add_master_key("master-1").unwrap();
+        let mgr = EncryptionManager::builder()
+            .kms_client(Arc::new(kms) as Arc<dyn KeyManagementClient>)
+            .table_key_id("master-1")
+            .build();
+
+        let output = io.new_output(path).unwrap();
+        let encrypted_output = mgr.encrypt(output);
+
+        let plaintext = b"Hello, encrypted Iceberg round-trip!";
+        let serialized_metadata = 
encrypted_output.key_metadata().encode().unwrap();
+        encrypted_output
+            .write(bytes::Bytes::from(plaintext.to_vec()))
+            .await
+            .unwrap();
+
+        let input = io.new_input(path).unwrap();
+        let parsed_metadata = 
StandardKeyMetadata::decode(&serialized_metadata).unwrap();
+        let decrypted_file = EncryptedInputFile::new(input, parsed_metadata);
+
+        let content = decrypted_file.read().await.unwrap();
+        assert_eq!(&content[..], plaintext);
+    }
+}
diff --git a/crates/iceberg/src/encryption/mod.rs 
b/crates/iceberg/src/encryption/mod.rs
index 773d781d6..12ee76e5e 100644
--- a/crates/iceberg/src/encryption/mod.rs
+++ b/crates/iceberg/src/encryption/mod.rs
@@ -21,15 +21,15 @@
 //! for encrypting and decrypting data in Iceberg tables.
 
 mod crypto;
-mod file_decryptor;
-mod file_encryptor;
+pub(crate) mod io;
 pub(crate) mod key_metadata;
 pub mod kms;
+mod manager;
 mod stream;
 
 pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
-pub use file_decryptor::AesGcmFileDecryptor;
-pub use file_encryptor::AesGcmFileEncryptor;
+pub use io::{EncryptedInputFile, EncryptedOutputFile};
 pub use key_metadata::StandardKeyMetadata;
 pub use kms::{GeneratedKey, KeyManagementClient};
+pub use manager::EncryptionManager;
 pub use stream::{AesGcmFileRead, AesGcmFileWrite};

Reply via email to