This is an automated email from the ASF dual-hosted git repository.
blackmwk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ca2e6e336 feat(encryption) [5/N] Support encryption: Encryption
Manager (#2383)
ca2e6e336 is described below
commit ca2e6e3365c32319f6172d4676c40f35fa039710
Author: Xander <[email protected]>
AuthorDate: Wed May 13 09:31:14 2026 +0100
feat(encryption) [5/N] Support encryption: Encryption Manager (#2383)
## Which issue does this PR close?
Part of https://github.com/apache/iceberg-rust/issues/2034
## What changes are included in this PR?
Stacked on #2340. Adds `EncryptionManager` — handles two-layer envelope
encryption (master key → KEK → DEK) for Iceberg tables.
### New files
- **`manager.rs`** — `EncryptionManager` with `typed_builder`
construction:
- `encrypt()` — wraps an `OutputFile` in an `EncryptedOutputFile` with a
freshly generated DEK + AAD prefix
- `wrap_key_metadata()` / `unwrap_key_metadata()` — KEK envelope
wrap/unwrap for manifest list key metadata
- KEK lifecycle: creation, rotation (730-day NIST SP 800-57 lifespan),
caching (1-hour TTL via moka)
- Builder exposes both `encryption_keys(HashMap)` (bulk, for production
load from `TableMetadata`) and `add_encryption_key(EncryptedKey)`
(one-at-a-time, ergonomic for tests)
- **`io.rs`** — `EncryptedInputFile` / `EncryptedOutputFile` wrappers
that hold a `StandardKeyMetadata` and lazily build the AGS1 stream
cipher on `reader()` / `writer()`
### Design decisions
- **No `NativeEncryptedInputFile` / `NativeEncryptedOutputFile`
wrappers** — For PME, `StandardKeyMetadata` already carries the DEK and
AAD prefix. Wrapper types that just bundle an `InputFile`/`OutputFile`
with the same data are unnecessary indirection.
- **No `decrypt()` method on the manager** — it had no manager state to
use; callers do `StandardKeyMetadata::decode(bytes)` +
`EncryptedInputFile::new(input, metadata)` directly. Mirrors the fact
that Java's `decrypt()` is also a thin factory unrelated to manager
state.
- **No `AesGcmFileDecryptor` / `AesGcmFileEncryptor`** — the encrypted
file types store `StandardKeyMetadata` directly and construct the
underlying `AesGcmFileRead` / `AesGcmFileWrite` on demand. One fewer
layer.
- **KEK timestamp AAD is required** — missing/tampered timestamps fail
with a clear `DataInvalid` error rather than silently passing `None` AAD
(which would weaken the tampering defense).
### How this differs from Java's `StandardEncryptionManager`
- **KEK management is explicit**: Java's `addManifestListKeyMetadata()`
mutates an internal map and callers need to downcast to
`StandardEncryptionManager` to access the keys. Our
`wrap_key_metadata()` returns `(wrapped_key, Option<new_kek>)` directly
— no hidden mutation, no downcasting. [Java
reference](https://github.com/apache/iceberg/blame/main/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java#L146-L151).
- **No `EncryptionUtil` grab-bag**: Java needs a static utility class
(`decryptManifestListKeyMetadata`, `encryptManifestListKeyMetadata`,
etc.) because the interface is too narrow. Here those are just methods
on `EncryptionManager`.
- **`table_key_id` enforced at compile time**: required via
`typed_builder`, can't forget it. Unencrypted tables use
`Option<EncryptionManager>` instead of Java's
`PlaintextEncryptionManager`.
- **Config values match Java exactly**: KEK lifespan (730 days), cache
TTL (1 hour), AAD prefix length (16 bytes) are all hardcoded.
### Usage notes (for follow-up PRs that wire this in)
`load_manifest_list` will do something like:
```rust
let unwrapped_km = em
.unwrap_key_metadata(encrypted_key,
table_metadata.encryption_keys_map())
.await?;
let metadata = StandardKeyMetadata::decode(&unwrapped_km)?;
let input = file_io.new_input(&self.manifest_list)?;
EncryptedInputFile::new(input, metadata).read().await?
```
## Are these changes tested?
Yes — 11 manager tests covering KEK creation/rotation/caching,
wrap/unwrap roundtrips, AAD tampering and missing-timestamp rejection,
plus a full encrypt-then-read roundtrip via `EncryptedInputFile`.
---
crates/iceberg/src/encryption/crypto.rs | 9 +
crates/iceberg/src/encryption/file_decryptor.rs | 156 ------
crates/iceberg/src/encryption/file_encryptor.rs | 138 -----
crates/iceberg/src/encryption/io.rs | 220 ++++++++
crates/iceberg/src/encryption/manager.rs | 671 ++++++++++++++++++++++++
crates/iceberg/src/encryption/mod.rs | 8 +-
6 files changed, 904 insertions(+), 298 deletions(-)
diff --git a/crates/iceberg/src/encryption/crypto.rs
b/crates/iceberg/src/encryption/crypto.rs
index 0f6a9eff4..02d085ebf 100644
--- a/crates/iceberg/src/encryption/crypto.rs
+++ b/crates/iceberg/src/encryption/crypto.rs
@@ -177,6 +177,15 @@ impl SecureKey {
}
}
+impl TryFrom<SensitiveBytes> for SecureKey {
+ type Error = Error;
+
+ fn try_from(key: SensitiveBytes) -> Result<Self> {
+ let key_size = AesKeySize::from_key_length(key.len())?;
+ Ok(Self { key, key_size })
+ }
+}
+
/// AES-GCM cipher for encrypting and decrypting data.
pub struct AesGcmCipher {
key: SensitiveBytes,
diff --git a/crates/iceberg/src/encryption/file_decryptor.rs
b/crates/iceberg/src/encryption/file_decryptor.rs
deleted file mode 100644
index e44c0e1d7..000000000
--- a/crates/iceberg/src/encryption/file_decryptor.rs
+++ /dev/null
@@ -1,156 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! File-level decryption helper for AGS1 stream-encrypted files.
-
-use std::fmt;
-use std::sync::Arc;
-
-use super::crypto::{AesGcmCipher, SecureKey};
-use super::stream::AesGcmFileRead;
-use crate::Result;
-use crate::io::FileRead;
-
-/// Holds the decryption material for a single encrypted file.
-///
-/// Created from a plaintext DEK and AAD prefix, then used to wrap
-/// an encrypted file reader for transparent decryption on read.
-pub struct AesGcmFileDecryptor {
- cipher: Arc<AesGcmCipher>,
- aad_prefix: Box<[u8]>,
-}
-
-impl AesGcmFileDecryptor {
- /// Creates a new `AesGcmFileDecryptor` from a plaintext DEK and AAD
prefix.
- pub fn new(dek: &[u8], aad_prefix: impl Into<Box<[u8]>>) -> Result<Self> {
- let key = SecureKey::new(dek)?;
- let cipher = Arc::new(AesGcmCipher::new(key));
- Ok(Self {
- cipher,
- aad_prefix: aad_prefix.into(),
- })
- }
-
- /// Wraps a raw encrypted-file reader in a decrypting [`AesGcmFileRead`].
- pub fn wrap_reader(
- &self,
- reader: Box<dyn FileRead>,
- encrypted_file_length: u64,
- ) -> Result<Box<dyn FileRead>> {
- let decrypting = AesGcmFileRead::new(
- reader,
- Arc::clone(&self.cipher),
- self.aad_prefix.clone(),
- encrypted_file_length,
- )?;
- Ok(Box::new(decrypting))
- }
-
- /// Calculates the plaintext length from an encrypted file's total length.
- pub fn plaintext_length(&self, encrypted_file_length: u64) -> Result<u64> {
- AesGcmFileRead::calculate_plaintext_length(encrypted_file_length)
- }
-}
-
-impl fmt::Debug for AesGcmFileDecryptor {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("AesGcmFileDecryptor")
- .field("aad_prefix_len", &self.aad_prefix.len())
- .finish_non_exhaustive()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::ops::Range;
-
- use bytes::Bytes;
-
- use super::*;
- use crate::encryption::AesGcmFileEncryptor;
- use crate::io::FileWrite;
-
- struct MemoryFileRead(Bytes);
-
- #[async_trait::async_trait]
- impl FileRead for MemoryFileRead {
- async fn read(&self, range: Range<u64>) -> Result<Bytes> {
- Ok(self.0.slice(range.start as usize..range.end as usize))
- }
- }
-
- struct MemoryFileWrite {
- buffer: std::sync::Arc<std::sync::Mutex<Vec<u8>>>,
- }
-
- #[async_trait::async_trait]
- impl FileWrite for MemoryFileWrite {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.buffer.lock().unwrap().extend_from_slice(&bs);
- Ok(())
- }
-
- async fn close(&mut self) -> Result<()> {
- Ok(())
- }
- }
-
- #[tokio::test]
- async fn test_wrap_reader_roundtrip() {
- let key = b"0123456789abcdef";
- let aad_prefix = b"test-aad-prefix!";
- let plaintext = b"Hello from file decryptor!";
-
- // Encrypt via the encryptor wrapper
- let encryptor = AesGcmFileEncryptor::new(key.as_slice(),
aad_prefix.as_slice()).unwrap();
- let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
- let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite {
- buffer: buffer.clone(),
- }));
- writer.write(Bytes::from(plaintext.to_vec())).await.unwrap();
- writer.close().await.unwrap();
- let encrypted = buffer.lock().unwrap().clone();
- let encrypted_len = encrypted.len() as u64;
-
- // Decrypt via the decryptor wrapper
- let decryptor = AesGcmFileDecryptor::new(key.as_slice(),
aad_prefix.as_slice()).unwrap();
- let reader = decryptor
- .wrap_reader(
- Box::new(MemoryFileRead(Bytes::from(encrypted))),
- encrypted_len,
- )
- .unwrap();
-
- let result = reader.read(0..plaintext.len() as u64).await.unwrap();
- assert_eq!(&result[..], plaintext);
- }
-
- #[tokio::test]
- async fn test_invalid_key_length() {
- let result = AesGcmFileDecryptor::new(b"too-short", b"aad".as_slice());
- assert!(result.is_err());
- }
-
- #[tokio::test]
- async fn test_plaintext_length() {
- let decryptor = AesGcmFileDecryptor::new(b"0123456789abcdef",
b"aad".as_slice()).unwrap();
- // header(8) + nonce(12) + 10 bytes ciphertext + tag(16) = 46
- let encrypted_len = 8 + 12 + 10 + 16;
- let plain_len = decryptor.plaintext_length(encrypted_len).unwrap();
- assert_eq!(plain_len, 10);
- }
-}
diff --git a/crates/iceberg/src/encryption/file_encryptor.rs
b/crates/iceberg/src/encryption/file_encryptor.rs
deleted file mode 100644
index 773438ad8..000000000
--- a/crates/iceberg/src/encryption/file_encryptor.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! File-level encryption helper for AGS1 stream-encrypted files.
-
-use std::fmt;
-use std::sync::Arc;
-
-use super::crypto::{AesGcmCipher, SecureKey};
-use super::stream::AesGcmFileWrite;
-use crate::Result;
-use crate::io::FileWrite;
-
-/// Holds the encryption material for a single encrypted file.
-///
-/// This is the write-side counterpart to
-/// [`AesGcmFileDecryptor`](super::AesGcmFileDecryptor). Created from
-/// a plaintext DEK and AAD prefix, then used to wrap an output writer
-/// for transparent encryption on write.
-pub struct AesGcmFileEncryptor {
- cipher: Arc<AesGcmCipher>,
- aad_prefix: Box<[u8]>,
-}
-
-impl AesGcmFileEncryptor {
- /// Creates a new `AesGcmFileEncryptor` from a plaintext DEK and AAD
prefix.
- pub fn new(dek: &[u8], aad_prefix: impl Into<Box<[u8]>>) -> Result<Self> {
- let key = SecureKey::new(dek)?;
- let cipher = Arc::new(AesGcmCipher::new(key));
- Ok(Self {
- cipher,
- aad_prefix: aad_prefix.into(),
- })
- }
-
- /// Wraps a raw output writer in an encrypting [`AesGcmFileWrite`].
- pub fn wrap_writer(&self, writer: Box<dyn FileWrite>) -> Box<dyn
FileWrite> {
- Box::new(AesGcmFileWrite::new(
- writer,
- Arc::clone(&self.cipher),
- self.aad_prefix.clone(),
- ))
- }
-}
-
-impl fmt::Debug for AesGcmFileEncryptor {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("AesGcmFileEncryptor")
- .field("aad_prefix_len", &self.aad_prefix.len())
- .finish_non_exhaustive()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::ops::Range;
-
- use bytes::Bytes;
-
- use super::*;
- use crate::encryption::AesGcmFileDecryptor;
- use crate::io::FileRead;
-
- struct MemoryFileRead(Bytes);
-
- #[async_trait::async_trait]
- impl FileRead for MemoryFileRead {
- async fn read(&self, range: Range<u64>) -> Result<Bytes> {
- Ok(self.0.slice(range.start as usize..range.end as usize))
- }
- }
-
- struct MemoryFileWrite {
- buffer: std::sync::Arc<std::sync::Mutex<Vec<u8>>>,
- }
-
- #[async_trait::async_trait]
- impl FileWrite for MemoryFileWrite {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.buffer.lock().unwrap().extend_from_slice(&bs);
- Ok(())
- }
-
- async fn close(&mut self) -> Result<()> {
- Ok(())
- }
- }
-
- #[tokio::test]
- async fn test_wrap_writer_roundtrip() {
- let key = b"0123456789abcdef";
- let aad_prefix = b"test-aad-prefix!";
- let plaintext = b"Hello from file encryptor!";
-
- // Encrypt via the encryptor wrapper
- let encryptor = AesGcmFileEncryptor::new(key.as_slice(),
aad_prefix.as_slice()).unwrap();
- let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
- let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite {
- buffer: buffer.clone(),
- }));
- writer.write(Bytes::from(plaintext.to_vec())).await.unwrap();
- writer.close().await.unwrap();
- let encrypted = buffer.lock().unwrap().clone();
- let encrypted_len = encrypted.len() as u64;
-
- // Decrypt via the decryptor wrapper
- let decryptor = AesGcmFileDecryptor::new(key.as_slice(),
aad_prefix.as_slice()).unwrap();
- let reader = decryptor
- .wrap_reader(
- Box::new(MemoryFileRead(Bytes::from(encrypted))),
- encrypted_len,
- )
- .unwrap();
-
- let result = reader.read(0..plaintext.len() as u64).await.unwrap();
- assert_eq!(&result[..], plaintext);
- }
-
- #[tokio::test]
- async fn test_invalid_key_length() {
- let result = AesGcmFileEncryptor::new(b"bad-key", b"aad".as_slice());
- assert!(result.is_err());
- }
-}
diff --git a/crates/iceberg/src/encryption/io.rs
b/crates/iceberg/src/encryption/io.rs
new file mode 100644
index 000000000..c3d81dd85
--- /dev/null
+++ b/crates/iceberg/src/encryption/io.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Encrypted file wrappers for InputFile / OutputFile.
+
+use std::sync::Arc;
+
+use bytes::Bytes;
+
+use super::crypto::{AesGcmCipher, SecureKey};
+use super::key_metadata::StandardKeyMetadata;
+use super::stream::{AesGcmFileRead, AesGcmFileWrite};
+use crate::Result;
+use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
+
+/// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`].
+///
+/// Transparently decrypts on read.
+pub struct EncryptedInputFile {
+ inner: InputFile,
+ key_metadata: StandardKeyMetadata,
+}
+
+impl EncryptedInputFile {
+ /// Creates a new encrypted input file.
+ pub fn new(inner: InputFile, key_metadata: StandardKeyMetadata) -> Self {
+ Self {
+ inner,
+ key_metadata,
+ }
+ }
+
+ /// Absolute path of the file.
+ pub fn location(&self) -> &str {
+ self.inner.location()
+ }
+
+ /// Check if file exists.
+ pub async fn exists(&self) -> Result<bool> {
+ self.inner.exists().await
+ }
+
+ /// Fetch and returns metadata of file.
+ ///
+ /// The returned size is the **plaintext** size.
+ pub async fn metadata(&self) -> Result<FileMetadata> {
+ let raw_meta = self.inner.metadata().await?;
+ let plaintext_size =
AesGcmFileRead::calculate_plaintext_length(raw_meta.size)?;
+ Ok(FileMetadata {
+ size: plaintext_size,
+ })
+ }
+
+ /// Read and returns whole content of file (decrypted plaintext).
+ pub async fn read(&self) -> Result<Bytes> {
+ let meta = self.metadata().await?;
+ let reader = self.reader().await?;
+ reader.read(0..meta.size).await
+ }
+
+ /// Creates a reader that transparently decrypts on each read.
+ pub async fn reader(&self) -> Result<Box<dyn FileRead>> {
+ let raw_meta = self.inner.metadata().await?;
+ let raw_reader = self.inner.reader().await?;
+ let cipher = build_cipher(&self.key_metadata)?;
+ let aad_prefix: Box<[u8]> =
self.key_metadata.aad_prefix().unwrap_or_default().into();
+ let decrypting = AesGcmFileRead::new(raw_reader, cipher, aad_prefix,
raw_meta.size)?;
+ Ok(Box::new(decrypting))
+ }
+
+ /// Returns a reference to the file's key metadata.
+ pub fn key_metadata(&self) -> &StandardKeyMetadata {
+ &self.key_metadata
+ }
+
+ /// Consumes self and returns the underlying plain input file.
+ pub fn into_inner(self) -> InputFile {
+ self.inner
+ }
+}
+
+impl std::fmt::Debug for EncryptedInputFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("EncryptedInputFile")
+ .field("path", &self.inner.location())
+ .finish_non_exhaustive()
+ }
+}
+
+/// An AGS1 stream-encrypted output file wrapping a plain [`OutputFile`].
+///
+/// Transparently encrypts on write.
+pub struct EncryptedOutputFile {
+ inner: OutputFile,
+ key_metadata: StandardKeyMetadata,
+}
+
+impl EncryptedOutputFile {
+ /// Creates a new encrypted output file.
+ pub fn new(inner: OutputFile, key_metadata: StandardKeyMetadata) -> Self {
+ Self {
+ inner,
+ key_metadata,
+ }
+ }
+
+ /// Returns a reference to the file's key metadata.
+ pub fn key_metadata(&self) -> &StandardKeyMetadata {
+ &self.key_metadata
+ }
+
+ /// Absolute path of the file.
+ pub fn location(&self) -> &str {
+ self.inner.location()
+ }
+
+ /// Creates a writer that transparently encrypts on each write.
+ pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+ let raw_writer = self.inner.writer().await?;
+ let cipher = build_cipher(&self.key_metadata)?;
+ let aad_prefix: Box<[u8]> =
self.key_metadata.aad_prefix().unwrap_or_default().into();
+ Ok(Box::new(AesGcmFileWrite::new(
+ raw_writer, cipher, aad_prefix,
+ )))
+ }
+
+ /// Write bytes to file (transparently encrypted).
+ pub async fn write(&self, bs: Bytes) -> Result<()> {
+ let mut writer = self.writer().await?;
+ writer.write(bs).await?;
+ writer.close().await
+ }
+
+ /// Deletes the underlying file.
+ pub async fn delete(&self) -> Result<()> {
+ self.inner.delete().await
+ }
+
+ /// Consumes self and returns the underlying plain output file.
+ pub fn into_inner(self) -> OutputFile {
+ self.inner
+ }
+}
+
+impl std::fmt::Debug for EncryptedOutputFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("EncryptedOutputFile")
+ .field("path", &self.inner.location())
+ .finish_non_exhaustive()
+ }
+}
+
+fn build_cipher(metadata: &StandardKeyMetadata) -> Result<Arc<AesGcmCipher>> {
+ let key = SecureKey::new(metadata.encryption_key().as_bytes())?;
+ Ok(Arc::new(AesGcmCipher::new(key)))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIO;
+
+ fn key_metadata() -> StandardKeyMetadata {
+
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
+ }
+
+ #[tokio::test]
+ async fn test_write_read_roundtrip() {
+ let fileio = FileIO::new_with_memory();
+ let path = "memory:///test/io_roundtrip.bin";
+ let plaintext = b"Hello from EncryptedInputFile/EncryptedOutputFile!";
+
+ let output =
EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata());
+ output.write(Bytes::from(plaintext.to_vec())).await.unwrap();
+
+ let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(),
key_metadata());
+ let content = input.read().await.unwrap();
+ assert_eq!(&content[..], plaintext);
+ }
+
+ #[tokio::test]
+ async fn test_metadata_returns_plaintext_size() {
+ let fileio = FileIO::new_with_memory();
+ let path = "memory:///test/io_metadata.bin";
+ let plaintext = b"some bytes to measure";
+
+ let output =
EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata());
+ output.write(Bytes::from(plaintext.to_vec())).await.unwrap();
+
+ let raw_size = fileio
+ .new_input(path)
+ .unwrap()
+ .metadata()
+ .await
+ .unwrap()
+ .size;
+ assert!(
+ raw_size > plaintext.len() as u64,
+ "encrypted file should be larger than plaintext (header + nonce +
tag)"
+ );
+
+ let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(),
key_metadata());
+ let meta = input.metadata().await.unwrap();
+ assert_eq!(meta.size, plaintext.len() as u64);
+ }
+}
diff --git a/crates/iceberg/src/encryption/manager.rs
b/crates/iceberg/src/encryption/manager.rs
new file mode 100644
index 000000000..a4c5b9c64
--- /dev/null
+++ b/crates/iceberg/src/encryption/manager.rs
@@ -0,0 +1,671 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Encryption manager for file-level encryption and two-layer envelope key
management.
+//!
+//! [`EncryptionManager`] provides file-level `decrypt` / `encrypt`
+//! operations matching Java's
`org.apache.iceberg.encryption.EncryptionManager`,
+//! using envelope encryption:
+//! - A master key (in KMS) wraps a Key Encryption Key (KEK)
+//! - The KEK wraps Data Encryption Keys (DEKs) locally
+
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
+
+use aes_gcm::aead::OsRng;
+use aes_gcm::aead::rand_core::RngCore;
+use chrono::Utc;
+use moka::future::Cache;
+use uuid::Uuid;
+
+const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000;
+
+use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
+use super::io::EncryptedOutputFile;
+use super::key_metadata::StandardKeyMetadata;
+use super::kms::KeyManagementClient;
+use crate::io::OutputFile;
+use crate::spec::EncryptedKey;
+use crate::{Error, ErrorKind, Result};
+
+/// Property key for the KEK creation timestamp (milliseconds since epoch).
+/// Matches Java's `StandardEncryptionManager.KEY_TIMESTAMP`.
+pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP";
+
+/// Default KEK lifespan in days, per NIST SP 800-57.
+const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730;
+
+/// Default cache TTL for unwrapped KEKs.
+const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600);
+
+/// Default AAD prefix length in bytes.
+/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`.
+const AAD_PREFIX_LENGTH: usize = 16;
+
+/// File-level encryption manager using two-layer envelope encryption.
+///
+/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls.
+#[derive(typed_builder::TypedBuilder)]
+#[builder(mutators(
+ /// Add an encryption key (KEK or wrapped key metadata entry).
+ pub fn add_encryption_key(&mut self, key: EncryptedKey) {
+ self.encryption_keys
+ .write()
+ .expect("encryption_keys lock poisoned")
+ .insert(key.key_id().to_string(), key);
+ }
+ /// Set all encryption keys from table metadata.
+ pub fn encryption_keys(&mut self, keys: HashMap<String, EncryptedKey>) {
+ self.encryption_keys = RwLock::new(keys);
+ }
+))]
+pub struct EncryptionManager {
+ kms_client: Arc<dyn KeyManagementClient>,
+ #[builder(
+ default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(),
+ setter(skip)
+ )]
+ kek_cache: Cache<String, SensitiveBytes>,
+ /// AES key size for DEK generation. Defaults to 128-bit.
+ #[builder(default = AesKeySize::default())]
+ key_size: AesKeySize,
+ /// Master key ID from table property `encryption.key-id`.
+ #[builder(setter(into))]
+ table_key_id: String,
+ /// All encryption keys from table metadata (KEKs and wrapped key metadata
entries).
+ /// Newly created KEKs and wrapped manifest-list entries are inserted here
so
+ /// callers can snapshot the full set at commit time via
[`EncryptionManager::encryption_keys`].
+ #[builder(default = RwLock::new(HashMap::new()), via_mutators)]
+ encryption_keys: RwLock<HashMap<String, EncryptedKey>>,
+}
+
+impl fmt::Debug for EncryptionManager {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EncryptionManager")
+ .field("key_size", &self.key_size)
+ .field("table_key_id", &self.table_key_id)
+ .finish_non_exhaustive()
+ }
+}
+
+impl EncryptionManager {
+ /// Encrypt a file with AGS1 stream encryption.
+ ///
+ /// Returns an [`EncryptedOutputFile`] that transparently encrypts on
+ /// write, along with key metadata for later decryption.
+ pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile {
+ let dek = SecureKey::generate(self.key_size);
+ let aad_prefix = Self::generate_aad_prefix();
+ let metadata =
StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix);
+ EncryptedOutputFile::new(raw_output, metadata)
+ }
+
+ /// Wrap a manifest list key metadata with a KEK for storage in table
metadata.
+ ///
+ /// Stores the resulting wrapped entry (and any newly created KEK) in the
+ /// manager's internal `encryption_keys` map. Callers persist the full set
+ /// at commit time via [`Self::encryption_keys`].
+ ///
+ /// Returns the `key_id` of the wrapped entry, which should be recorded on
+ /// the snapshot as `encryption_key_id` so readers can locate it later.
+ pub async fn encrypt_manifest_list_key_metadata(
+ &self,
+ key_metadata: &StandardKeyMetadata,
+ ) -> Result<String> {
+ let kek = match self.find_active_kek()? {
+ Some(existing) => existing,
+ None => self.create_kek().await?,
+ };
+
+ let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
+
+ // Use the KEK timestamp as AAD to prevent timestamp tampering attacks.
+ let aad = Self::kek_timestamp_aad(&kek)?;
+ let serialized = key_metadata.encode()?;
+ let wrapped_metadata = self.wrap_dek_with_kek(&serialized, &kek_bytes,
Some(aad))?;
+
+ let wrapped_key = EncryptedKey::builder()
+ .key_id(Uuid::new_v4().to_string())
+ .encrypted_key_metadata(wrapped_metadata)
+ .encrypted_by_id(kek.key_id())
+ .build();
+
+ let wrapped_key_id = wrapped_key.key_id().to_string();
+ self.insert_encryption_key(wrapped_key);
+ Ok(wrapped_key_id)
+ }
+
+ /// Decrypt a manifest list key metadata previously wrapped via
+ /// [`Self::encrypt_manifest_list_key_metadata`].
+ ///
+ /// Looks up the entry by `encryption_key_id` (typically read from the
+ /// snapshot) in the manager's `encryption_keys` map.
+ pub async fn decrypt_manifest_list_key_metadata(
+ &self,
+ encryption_key_id: &str,
+ ) -> Result<StandardKeyMetadata> {
+ let encrypted_key = self
+ .encryption_keys
+ .read()
+ .expect("encryption_keys lock poisoned")
+ .get(encryption_key_id)
+ .cloned()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Encryption key '{encryption_key_id}' not found"),
+ )
+ })?;
+
+ let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "EncryptedKey '{}' has no encrypted_by_id",
+ encrypted_key.key_id()
+ ),
+ )
+ })?;
+
+ let bytes = self
+ .decrypt_dek(kek_key_id, encrypted_key.encrypted_key_metadata())
+ .await?;
+
+ StandardKeyMetadata::decode(bytes.as_bytes())
+ }
+
+ /// Borrow the encryption keys held by this manager.
+ ///
+ /// Use at commit time to persist newly created KEKs and wrapped
+ /// manifest-list entries into `TableMetadata.encryption_keys`.
+ pub fn with_encryption_keys<F, R>(&self, f: F) -> R
+ where F: FnOnce(&HashMap<String, EncryptedKey>) -> R {
+ let keys = self
+ .encryption_keys
+ .read()
+ .expect("encryption_keys lock poisoned");
+ f(&keys)
+ }
+
+ fn insert_encryption_key(&self, key: EncryptedKey) {
+ self.encryption_keys
+ .write()
+ .expect("encryption_keys lock poisoned")
+ .insert(key.key_id().to_string(), key);
+ }
+
+ /// Create a new KEK, wrapped by the table's master key, and store it in
+ /// the manager's `encryption_keys` map.
+ async fn create_kek(&self) -> Result<EncryptedKey> {
+ let (plaintext_kek, wrapped_kek) = if
self.kms_client.supports_key_generation() {
+ let result =
self.kms_client.generate_key(&self.table_key_id).await?;
+ (result.key().clone(), result.wrapped_key().to_vec())
+ } else {
+ let plaintext_key = SecureKey::generate(self.key_size);
+ let wrapped = self
+ .kms_client
+ .wrap_key(plaintext_key.as_bytes(), &self.table_key_id)
+ .await?;
+
+ (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped)
+ };
+
+ let key_id = Uuid::new_v4().to_string();
+ let now_ms = Utc::now().timestamp_millis();
+
+ let mut properties = HashMap::new();
+ properties.insert(KEK_CREATED_AT_PROPERTY.to_string(),
now_ms.to_string());
+
+ self.kek_cache.insert(key_id.clone(), plaintext_kek).await;
+
+ let kek = EncryptedKey::builder()
+ .key_id(key_id)
+ .encrypted_key_metadata(wrapped_kek)
+ .encrypted_by_id(&self.table_key_id)
+ .properties(properties)
+ .build();
+
+ self.insert_encryption_key(kek.clone());
+ Ok(kek)
+ }
+
+ /// Check whether a KEK has exceeded its configured lifespan (730 days per
NIST SP 800-57).
+ fn is_kek_expired(&self, kek: &EncryptedKey) -> bool {
+ let created_at_ms = match kek
+ .properties()
+ .get(KEK_CREATED_AT_PROPERTY)
+ .and_then(|ts| ts.parse::<i64>().ok())
+ {
+ Some(ts) => ts,
+ None => return true, // No timestamp -> treat as expired
+ };
+
+ let now_ms = Utc::now().timestamp_millis();
+ let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY;
+ (now_ms - created_at_ms) >= lifespan_ms
+ }
+
+ /// Find the latest non-expired KEK for the table's master key.
+ fn find_active_kek(&self) -> Result<Option<EncryptedKey>> {
+ let keys = self
+ .encryption_keys
+ .read()
+ .expect("encryption_keys lock poisoned");
+ Ok(keys
+ .values()
+ .filter(|kek| {
+ kek.encrypted_by_id()
+ .map(|id| id == self.table_key_id)
+ .unwrap_or(false)
+ && !self.is_kek_expired(kek)
+ })
+ .max_by_key(|kek| {
+ kek.properties()
+ .get(KEK_CREATED_AT_PROPERTY)
+ .and_then(|ts| ts.parse::<i64>().ok())
+ .unwrap_or(0)
+ })
+ .cloned())
+ }
+
+ /// Unwrap a KEK using the KMS, with caching to avoid repeated calls.
+ async fn unwrap_key_encryption_key(&self, kek: &EncryptedKey) ->
Result<SensitiveBytes> {
+ let cache_key = kek.key_id().to_string();
+
+ if let Some(cached) = self.kek_cache.get(&cache_key).await {
+ return Ok(cached);
+ }
+
+ let master_key_id = kek.encrypted_by_id().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("KEK '{}' has no encrypted_by_id", kek.key_id()),
+ )
+ })?;
+
+ let plaintext = self
+ .kms_client
+ .unwrap_key(kek.encrypted_key_metadata(), master_key_id)
+ .await?;
+
+ self.kek_cache.insert(cache_key, plaintext.clone()).await;
+
+ Ok(plaintext)
+ }
+
+ /// Decrypt a wrapped DEK using the KEK identified by `kek_key_id`,
+ /// looked up in the manager's own `encryption_keys` map.
+ async fn decrypt_dek(&self, kek_key_id: &str, wrapped_dek: &[u8]) ->
Result<SensitiveBytes> {
+ let kek = self
+ .encryption_keys
+ .read()
+ .expect("encryption_keys lock poisoned")
+ .get(kek_key_id)
+ .cloned()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("KEK not found in encryption keys: {kek_key_id}"),
+ )
+ })?;
+
+ // KEK timestamp as AAD prevents timestamp tampering.
+ let aad = Self::kek_timestamp_aad(&kek)?;
+
+ let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
+ self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad))
+ .map_err(|e| {
+ Error::new(
+ e.kind(),
+ format!("Failed to unwrap key metadata with KEK
'{kek_key_id}'"),
+ )
+ .with_source(e)
+ })
+ }
+
+ /// Extract the KEK timestamp for use as AAD. Returns an error if missing.
+ fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> {
+ kek.properties()
+ .get(KEK_CREATED_AT_PROPERTY)
+ .map(|ts| ts.as_bytes())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "KEK '{}' is missing required '{}' property",
+ kek.key_id(),
+ KEK_CREATED_AT_PROPERTY
+ ),
+ )
+ })
+ }
+
+ /// Generate a random AAD prefix for file encryption.
+ fn generate_aad_prefix() -> Box<[u8]> {
+ let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
+ OsRng.fill_bytes(&mut prefix);
+ prefix.into_boxed_slice()
+ }
+
+ /// Wrap a DEK with a KEK using local AES-GCM.
+ fn wrap_dek_with_kek(
+ &self,
+ dek: &[u8],
+ kek: &SensitiveBytes,
+ aad: Option<&[u8]>,
+ ) -> Result<Vec<u8>> {
+ let key = SecureKey::try_from(kek.clone())?;
+ let cipher = AesGcmCipher::new(key);
+ cipher.encrypt(dek, aad)
+ }
+
+ /// Unwrap a DEK with a KEK using local AES-GCM.
+ fn unwrap_dek_with_kek(
+ &self,
+ wrapped_dek: &[u8],
+ kek: &SensitiveBytes,
+ aad: Option<&[u8]>,
+ ) -> Result<SensitiveBytes> {
+ let key = SecureKey::try_from(kek.clone())?;
+ let cipher = AesGcmCipher::new(key);
+ cipher.decrypt(wrapped_dek, aad).map(SensitiveBytes::new)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::encryption::EncryptedInputFile;
+ use crate::encryption::kms::MemoryKeyManagementClient;
+
+ fn create_test_kms() -> Arc<dyn KeyManagementClient> {
+ let kms = MemoryKeyManagementClient::new();
+ kms.add_master_key("master-1").unwrap();
+ Arc::new(kms)
+ }
+
+ fn create_test_manager() -> EncryptionManager {
+ EncryptionManager::builder()
+ .kms_client(create_test_kms())
+ .table_key_id("master-1")
+ .build()
+ }
+
+ #[tokio::test]
+ async fn test_create_kek() {
+ let mgr = create_test_manager();
+ let kek = mgr.create_kek().await.unwrap();
+
+ assert!(!kek.key_id().is_empty());
+ assert!(!kek.encrypted_key_metadata().is_empty());
+ assert_eq!(kek.encrypted_by_id(), Some("master-1"));
+ assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY));
+ }
+
+ fn sample_key_metadata() -> StandardKeyMetadata {
+
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
+ }
+
+ #[tokio::test]
+ async fn test_wrap_unwrap_key_metadata_roundtrip() {
+ let mgr = create_test_manager();
+ let plaintext = sample_key_metadata();
+
+ let key_id = mgr
+ .encrypt_manifest_list_key_metadata(&plaintext)
+ .await
+ .unwrap();
+
+ // First wrap should create a new KEK and the wrapped entry — both
stored on the manager
+ assert_eq!(mgr.with_encryption_keys(|k| k.len()), 2);
+
+ let decrypted = mgr
+ .decrypt_manifest_list_key_metadata(&key_id)
+ .await
+ .unwrap();
+ assert_eq!(decrypted, plaintext);
+ }
+
+ #[tokio::test]
+ async fn test_kek_reuse_when_not_expired() {
+ let mgr = create_test_manager();
+
+ // First wrap creates a new KEK + wrapped entry (2 keys)
+ let _id1 = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+ let kek_id = mgr.with_encryption_keys(|keys| {
+ assert_eq!(keys.len(), 2);
+ keys.values()
+ .find(|k| k.encrypted_by_id() == Some("master-1"))
+ .unwrap()
+ .key_id()
+ .to_string()
+ });
+
+ // Second wrap should reuse the existing KEK (only adds 1 new wrapped
entry)
+ let id2 = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+ let entry2 = mgr.with_encryption_keys(|keys| {
+ assert_eq!(keys.len(), 3);
+ keys.get(&id2).cloned().unwrap()
+ });
+ assert_eq!(entry2.encrypted_by_id(), Some(kek_id.as_str()));
+ }
+
+ #[tokio::test]
+ async fn test_kek_rotation_when_expired() {
+ let kms = create_test_kms();
+
+ // Create a KEK with a timestamp 3 years in the past (exceeds 730-day
lifespan)
+ let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 *
MILLIS_IN_DAY);
+ let mut properties = HashMap::new();
+ properties.insert(
+ KEK_CREATED_AT_PROPERTY.to_string(),
+ three_years_ago_ms.to_string(),
+ );
+
+ // Wrap a real KEK so unwrap works if needed
+ let kek_key = SecureKey::generate(AesKeySize::Bits128);
+ let wrapped = kms.wrap_key(kek_key.as_bytes(),
"master-1").await.unwrap();
+
+ let old_kek = EncryptedKey::builder()
+ .key_id("expired-kek")
+ .encrypted_key_metadata(wrapped)
+ .encrypted_by_id("master-1")
+ .properties(properties)
+ .build();
+
+ // Build manager with the expired KEK
+ let mgr = EncryptionManager::builder()
+ .kms_client(kms)
+ .table_key_id("master-1")
+ .add_encryption_key(old_kek.clone())
+ .build();
+
+ // Wrap should rotate to a new KEK since the existing one is expired
+ let new_entry_id = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+ let entry = mgr
+ .with_encryption_keys(|keys| keys.get(&new_entry_id).cloned())
+ .unwrap();
+ let used_kek_id = entry.encrypted_by_id().unwrap();
+ assert_ne!(used_kek_id, old_kek.key_id());
+ }
+
+ #[tokio::test]
+ async fn test_is_kek_expired_no_timestamp() {
+ let mgr = create_test_manager();
+
+ // KEK without a created-at timestamp -> treated as expired
+ let kek = EncryptedKey::builder()
+ .key_id("no-ts")
+ .encrypted_key_metadata(vec![0u8; 32])
+ .build();
+
+ assert!(mgr.is_kek_expired(&kek));
+ }
+
+ #[tokio::test]
+ async fn test_decrypt_with_unknown_key_id() {
+ let mgr = create_test_manager();
+ let result =
mgr.decrypt_manifest_list_key_metadata("nonexistent").await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_kek_cache_hit() {
+ let mgr = create_test_manager();
+
+ // First wrap caches the plaintext KEK during create_kek().
+ let key_id = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+
+ // Decrypt unwraps the KEK; with the cache populated this should not
hit KMS again.
+ let _ = mgr
+ .decrypt_manifest_list_key_metadata(&key_id)
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_unwrap_fails_when_kek_missing_timestamp() {
+ let mgr = create_test_manager();
+
+ // Wrap some metadata to get a valid encrypted entry stored on the
manager
+ let entry_id = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+
+ // Find the KEK that wrapped the entry and replace it with a copy that
+ // is missing the KEY_TIMESTAMP property, simulating a malformed table.
+ let mut keys = mgr.with_encryption_keys(|k| k.clone());
+ let kek_id = keys
+ .get(&entry_id)
+ .unwrap()
+ .encrypted_by_id()
+ .unwrap()
+ .to_string();
+ let kek = keys.remove(&kek_id).unwrap();
+ let kek_no_ts = EncryptedKey::builder()
+ .key_id(kek.key_id())
+ .encrypted_key_metadata(kek.encrypted_key_metadata())
+ .encrypted_by_id(kek.encrypted_by_id().unwrap())
+ .build();
+ keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts);
+
+ let mgr = EncryptionManager::builder()
+ .kms_client(create_test_kms())
+ .table_key_id("master-1")
+ .encryption_keys(keys)
+ .build();
+
+ let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert!(
+ err.to_string().contains(KEK_CREATED_AT_PROPERTY),
+ "error should mention the missing property: {err}"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_unwrap_fails_when_kek_timestamp_tampered() {
+ let mgr = create_test_manager();
+
+ // Wrap metadata normally
+ let entry_id = mgr
+ .encrypt_manifest_list_key_metadata(&sample_key_metadata())
+ .await
+ .unwrap();
+
+ // Tamper with the KEK timestamp (change the AAD)
+ let mut keys = mgr.with_encryption_keys(|k| k.clone());
+ let kek_id = keys
+ .get(&entry_id)
+ .unwrap()
+ .encrypted_by_id()
+ .unwrap()
+ .to_string();
+ let kek = keys.remove(&kek_id).unwrap();
+ let mut tampered_properties = kek.properties().clone();
+ tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(),
"9999999".to_string());
+ let tampered_kek = EncryptedKey::builder()
+ .key_id(kek.key_id())
+ .encrypted_key_metadata(kek.encrypted_key_metadata())
+ .encrypted_by_id(kek.encrypted_by_id().unwrap())
+ .properties(tampered_properties)
+ .build();
+ keys.insert(tampered_kek.key_id().to_string(), tampered_kek);
+
+ let mgr = EncryptionManager::builder()
+ .kms_client(create_test_kms())
+ .table_key_id("master-1")
+ .encryption_keys(keys)
+ .build();
+
+ // Unwrap should fail because the AAD (timestamp) doesn't match what
was used to wrap
+ let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
+ assert!(
+ result.is_err(),
+ "tampered timestamp should cause decryption failure"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_encrypt_decrypt_roundtrip() {
+ use crate::io::FileIO;
+
+ let io = FileIO::new_with_memory();
+ let path = "memory:///test/encrypt_roundtrip.bin";
+
+ let kms = MemoryKeyManagementClient::new();
+ kms.add_master_key("master-1").unwrap();
+ let mgr = EncryptionManager::builder()
+ .kms_client(Arc::new(kms) as Arc<dyn KeyManagementClient>)
+ .table_key_id("master-1")
+ .build();
+
+ let output = io.new_output(path).unwrap();
+ let encrypted_output = mgr.encrypt(output);
+
+ let plaintext = b"Hello, encrypted Iceberg round-trip!";
+ let serialized_metadata =
encrypted_output.key_metadata().encode().unwrap();
+ encrypted_output
+ .write(bytes::Bytes::from(plaintext.to_vec()))
+ .await
+ .unwrap();
+
+ let input = io.new_input(path).unwrap();
+ let parsed_metadata =
StandardKeyMetadata::decode(&serialized_metadata).unwrap();
+ let decrypted_file = EncryptedInputFile::new(input, parsed_metadata);
+
+ let content = decrypted_file.read().await.unwrap();
+ assert_eq!(&content[..], plaintext);
+ }
+}
diff --git a/crates/iceberg/src/encryption/mod.rs
b/crates/iceberg/src/encryption/mod.rs
index 773d781d6..12ee76e5e 100644
--- a/crates/iceberg/src/encryption/mod.rs
+++ b/crates/iceberg/src/encryption/mod.rs
@@ -21,15 +21,15 @@
//! for encrypting and decrypting data in Iceberg tables.
mod crypto;
-mod file_decryptor;
-mod file_encryptor;
+pub(crate) mod io;
pub(crate) mod key_metadata;
pub mod kms;
+mod manager;
mod stream;
pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
-pub use file_decryptor::AesGcmFileDecryptor;
-pub use file_encryptor::AesGcmFileEncryptor;
+pub use io::{EncryptedInputFile, EncryptedOutputFile};
pub use key_metadata::StandardKeyMetadata;
pub use kms::{GeneratedKey, KeyManagementClient};
+pub use manager::EncryptionManager;
pub use stream::{AesGcmFileRead, AesGcmFileWrite};