xanderbailey commented on code in PR #2183:
URL: https://github.com/apache/iceberg-rust/pull/2183#discussion_r2910045177


##########
docs/rfcs/0003_table_encryption.md:
##########
@@ -0,0 +1,648 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+# Table Encryption
+
+## Background
+
+### Iceberg Spec: Encryption
+
+The [Iceberg table spec](https://iceberg.apache.org/spec/#table-metadata) 
defines encryption
+as a first-class concept. Tables may store an `encryption-keys` map in their 
metadata,
+snapshots may reference an `encryption-key-id`, and manifest files carry 
optional
+`key_metadata` bytes. Data files themselves can be encrypted either at the 
stream level
+(AES-GCM envelope encryption, the "AGS1" format) or natively by the file 
format (e.g.
+Parquet Modular Encryption).
+
+The Java implementation (`org.apache.iceberg.encryption`) is the reference and 
has been
+production-tested. It defines:
+
+- **`EncryptionManager`** -- orchestrates encrypt/decrypt of 
`InputFile`/`OutputFile`
+- **`KeyManagementClient`** -- pluggable KMS integration (wrap/unwrap keys)
+- **`EncryptedInputFile` / `EncryptedOutputFile`** -- thin wrappers pairing a 
raw file handle
+  with its `EncryptionKeyMetadata`
+- **`StandardEncryptionManager`** -- envelope encryption with key caching, 
AGS1 streams,
+  and Parquet native encryption support
+- **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD 
prefix, file length)
+- **`AesGcmInputStream` / `AesGcmOutputStream`** -- block-based stream 
encryption (AGS1 format)
+
+### Relationship to Storage Trait RFC
+
+[RFC 0002 (Making Storage a 
Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes
+converting `Storage` from an enum to a trait and removing the `Extensions` 
mechanism from
+`FileIOBuilder`. This encryption RFC is designed to work both with the current 
`Extensions`-based
+`FileIO` and with the future trait-based storage. Specific adaptation points 
are called out below.
+
+---
+
+## High-Level Architecture
+
+The encryption system uses envelope encryption with a chained key hierarchy, 
adapted from the
+Java implementation to Rust's ownership and async model. KMS-managed master 
keys wrap KEKs,
+which encrypt only manifest list key metadata. All other DEKs are protected by 
being stored
+inside their encrypted parent files.
+
+### Key Hierarchy
+
+```
+Master Key (in KMS)
+  └── wraps → KEK (Key Encryption Key) — stored KMS-wrapped in table metadata
+        └── encrypts → manifest list StandardKeyMetadata (AES-GCM, 
KEY_TIMESTAMP as AAD)
+              │
+              ├── manifest list DEK → encrypts manifest list file (AGS1)
+              │     └── manifest key_metadata (plaintext StandardKeyMetadata) 
stored in manifest list entries
+              │           └── manifest DEK → encrypts manifest file (AGS1)
+              │                 └── data file key_metadata (plaintext 
StandardKeyMetadata) stored in manifest entries
+              │                       └── data file DEK → encrypts data file 
(AGS1 or Parquet native)
+```
+
+- **Master keys** live in the KMS and never leave it
+- **KEKs** are wrapped by the master key via KMS (`kmsClient.wrapKey()`) and 
stored in
+  `TableMetadata.encryption_keys` with a `KEY_TIMESTAMP` property for rotation 
tracking
+- **DEKs** are generated as plaintext random bytes and stored in 
`StandardKeyMetadata` per file.
+  DEKs are **not** individually wrapped by a KEK. Instead, they are protected 
by being stored
+  inside their encrypted parent file:
+  - **Manifest list DEKs**: Their `StandardKeyMetadata` is AES-GCM encrypted 
by a KEK
+    (using `KEY_TIMESTAMP` as AAD) and stored as an `EncryptedKey` in table 
metadata
+  - **Manifest DEKs**: Their `StandardKeyMetadata` is stored as plaintext 
`key_metadata` bytes
+    in manifest list entries — protected because the manifest list file itself 
is encrypted
+  - **Data file DEKs**: Their `StandardKeyMetadata` is stored as plaintext 
`key_metadata` bytes
+    in manifest entries — protected because the manifest file itself is 
encrypted
+- KEKs are cached in memory (moka async cache with configurable TTL) to avoid 
redundant KMS calls
+- KEK rotation occurs automatically when a KEK exceeds its configurable 
lifespan (default 730 days per NIST SP 800-57)
+
+### Component Overview
+
+```
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                              User / Table Scan                              │
+└─────────────────────────────────────────────────────────────────────────────┘
+                                      │
+                                      ▼
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                     EncryptionManager (trait)                                
│
+│                                                                             │
+│  StandardEncryptionManager:                                                 │
+│  - Envelope encryption: Master → KEK → manifest list StandardKeyMetadata   │
+│  - DEKs are plaintext, protected by encrypted parent files                 │
+│  - KEK cache (moka async, configurable TTL)                                │
+│  - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking)               │
+│  - encrypt() / decrypt() for AGS1 stream files                            │
+│  - encrypt_native() for Parquet Modular Encryption                         │
+│  - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS)           │
+│  - generate_dek() for per-file plaintext DEK generation                    │
+└─────────────────────────────────────────────────────────────────────────────┘
+              │                              │
+              ▼                              ▼
+┌──────────────────────────┐   ┌──────────────────────────────────────────────┐
+│   KeyManagementClient    │   │              KEK Cache                       │
+│       (trait)            │   │                                              │
+│                          │   │  - moka::future::Cache with configurable TTL │
+│  wrap_key(key, key_id)   │   │  - Thread-safe async                        │
+│  unwrap_key(wrapped, id) │   │  - Caches plaintext KEK bytes per key ID    │
+│  initialize(props)       │   │                                              │
+└──────────────────────────┘   └──────────────────────────────────────────────┘
+              │
+              ▼
+┌──────────────────────────┐
+│    KMS Implementations   │
+│                          │
+│  - InMemoryKms (testing) │
+│  - AWS KMS (future)      │
+│  - Azure KV (future)     │
+│  - GCP KMS (future)      │
+└──────────────────────────┘
+```
+
+### Data Flow
+
+#### Read Path (Decryption)
+
+```
+TableMetadata
+  └── encryption_keys: {key_id → EncryptedKey}
+          │
+Snapshot  │
+  └── encryption_key_id ──────┘   (V3 format only)
+          │
+          ▼
+  load_manifest_list(file_io, table_metadata)
+    1. Look up encryption_key_id in table_metadata.encryption_keys
+       → get manifest list EncryptedKey
+    2. Find the KEK via EncryptedKey.encrypted_by_id
+       → unwrap KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, 
table_key_id)
+       (KEK is cached to avoid redundant KMS calls)
+    3. AES-GCM decrypt the manifest list's StandardKeyMetadata using the
+       unwrapped KEK, with KEY_TIMESTAMP as AAD
+    4. Extract plaintext manifest list DEK from decrypted StandardKeyMetadata
+    5. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting 
InputFile
+          │
+          ▼
+ManifestFile
+  └── key_metadata: Option<Vec<u8>>  (plaintext StandardKeyMetadata, read from 
encrypted manifest list)
+          │
+  load_manifest(file_io)
+    1. If key_metadata present:
+       a. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix
+       b. file_io.new_encrypted_input() → AGS1-decrypting InputFile
+    2. If not: file_io.new_input()
+          │
+          ▼
+FileScanTask
+  └── key_metadata: Option<Vec<u8>>  (plaintext StandardKeyMetadata, read from 
encrypted manifest)
+          │
+  ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata()
+    1. If key_metadata present:
+       a. file_io.new_native_encrypted_input(path, key_metadata) → 
NativeEncrypted InputFile
+       b. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix
+       c. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD 
prefix)
+       d. Pass to ParquetRecordBatchStreamBuilder
+    2. If not: standard Parquet read
+```
+
+#### Write Path (Encryption)
+
+```
+RollingFileWriter::new_output_file()
+    1. If file_io.encryption_manager() is Some:
+       a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile
+       b. EncryptionManager generates random plaintext DEK + AAD prefix
+       c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet 
writer
+       d. Store plaintext StandardKeyMetadata as key_metadata bytes on DataFile
+          (protected by being stored inside the encrypted parent manifest)
+    2. ParquetWriter detects NativeEncrypted, configures 
FileEncryptionProperties
+
+SnapshotProducer::commit()
+    1. Manifest writing:
+       a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix
+       b. Write manifest to AGS1-encrypting OutputFile
+       c. Store plaintext StandardKeyMetadata as key_metadata on ManifestFile 
entry
+          (protected by being stored inside the encrypted parent manifest list)
+    2. Manifest list writing:
+       a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix
+       b. Write manifest list to AGS1-encrypting OutputFile
+       c. Get or create KEK:
+          - Find unexpired KEK (check KEY_TIMESTAMP, 730-day lifespan)
+          - If none: generate new KEK, wrap via KMS: kms_client.wrap_key(kek, 
table_key_id)
+       d. AES-GCM encrypt the manifest list's StandardKeyMetadata using the 
KEK,
+          with KEY_TIMESTAMP as AAD
+       e. Store as EncryptedKey (encrypted_by_id = kek_id) in encryption 
manager
+       f. Store manifest list key_id on Snapshot.encryption_key_id
+    3. Table commit includes AddEncryptionKey for all new entries:
+       - New KEKs (encrypted_by_id = table_key_id, properties include 
KEY_TIMESTAMP)
+       - New manifest list key metadata (encrypted_by_id = kek_id)
+```
+
+### Module Structure
+
+```
+crates/iceberg/src/
+├── encryption/
+│   ├── mod.rs                       # Module re-exports
+│   ├── crypto.rs                    # AES-GCM primitives (SecureKey, 
AesGcmEncryptor)
+│   ├── key_management.rs            # KeyManagementClient trait
+│   ├── key_metadata.rs              # StandardKeyMetadata (Avro V1, 
Java-compatible)
+│   ├── encryption_manager.rs        # EncryptionManager trait + 
StandardEncryptionManager
+│   ├── plaintext_encryption_manager.rs  # No-op pass-through for unencrypted 
tables
+│   ├── file_encryptor.rs            # FileEncryptor (write-side AGS1 wrapper)
+│   ├── file_decryptor.rs            # FileDecryptor (read-side AGS1 wrapper)
+│   ├── encrypted_io.rs              # EncryptedInputFile / 
EncryptedOutputFile wrappers
+│   ├── stream.rs                    # AesGcmFileRead / AesGcmFileWrite (AGS1 
format)
+│   ├── kms/
+│   │   ├── mod.rs
+│   │   └── in_memory.rs             # InMemoryKms (testing only)
+│   └── integration_tests.rs         # End-to-end encryption round-trip tests
+├── io/
+│   └── file_io.rs                   # InputFile/OutputFile enums, FileIO 
encryption methods
+├── arrow/
+│   └── reader.rs                    # Parquet decryption via 
FileDecryptionProperties
+├── writer/file_writer/
+│   ├── parquet_writer.rs            # Parquet FileEncryptionProperties 
integration
+│   └── rolling_writer.rs            # Encrypted output file creation + 
key_metadata propagation
+├── transaction/
+│   └── snapshot.rs                  # Encrypted manifest/manifest list 
writing, KEK management
+├── scan/
+│   ├── context.rs                   # key_metadata propagation from DataFile 
→ FileScanTask
+│   └── task.rs                      # FileScanTask.key_metadata field
+└── spec/
+    ├── snapshot.rs                  # Snapshot.encryption_key_id, 
load_manifest_list decryption
+    └── manifest_list.rs             # ManifestFile.key_metadata, 
load_manifest decryption
+```
+
+---
+
+## Design
+
+### Core Cryptographic Primitives
+
+#### EncryptionAlgorithm
+
+```rust
+pub enum EncryptionAlgorithm {
+    Aes128Gcm,
+    // Future: Aes256Gcm (depends on parquet-rs support)
+}
+```
+
+#### SecureKey
+
+Wraps key material with automatic zeroization on drop via 
`zeroize::Zeroizing<Vec<u8>>`:
+
+```rust
+pub struct SecureKey {
+    data: Zeroizing<Vec<u8>>,
+    algorithm: EncryptionAlgorithm,
+}
+
+impl SecureKey {
+    pub fn new(data: Vec<u8>, algorithm: EncryptionAlgorithm) -> Result<Self>;
+    pub fn generate(algorithm: EncryptionAlgorithm) -> Self;
+}
+```
+
+#### AesGcmEncryptor
+
+AES-GCM encrypt/decrypt. Ciphertext format matches the Java implementation:
+`[12-byte nonce][ciphertext][16-byte GCM tag]`.
+
+```rust
+pub struct AesGcmEncryptor { /* ... */ }
+
+impl AesGcmEncryptor {
+    pub fn new(key: SecureKey) -> Self;
+    pub fn encrypt(&self, plaintext: &[u8], aad: Option<&[u8]>) -> 
Result<Vec<u8>>;
+    pub fn decrypt(&self, ciphertext: &[u8], aad: Option<&[u8]>) -> 
Result<Vec<u8>>;
+}
+```
+
+### Key Management
+
+#### KeyManagementClient Trait
+
+Pluggable interface for KMS integration. Mirrors the Java 
`KeyManagementClient`:
+
+```rust
+#[async_trait]
+pub trait KeyManagementClient: Debug + Send + Sync {
+    async fn initialize(&mut self, properties: HashMap<String, String>) -> 
Result<()>;
+    async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> 
Result<Vec<u8>>;
+    async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) -> 
Result<Vec<u8>>;
+}
+```
+
+Users implement this trait to integrate with their KMS of choice (AWS KMS, 
Azure Key Vault,
+GCP KMS, HashiCorp Vault, etc.). An `InMemoryKms` is provided for testing.
+
+#### StandardKeyMetadata
+
+Avro-serialized metadata stored alongside encrypted files. Compatible with the 
Java
+`StandardKeyMetadata` format for cross-language interoperability:
+
+```rust
+pub struct StandardKeyMetadata {
+    encryption_key: Vec<u8>,  // Plaintext DEK (always plaintext — never 
individually wrapped)
+    aad_prefix: Vec<u8>,      // Additional authenticated data prefix
+    file_length: Option<i64>, // Optional encrypted file length
+}
+// Note: For manifest lists, the entire serialized StandardKeyMetadata is 
AES-GCM
+// encrypted by a KEK before storage. For manifests and data files, the
+// StandardKeyMetadata is stored as plaintext key_metadata in the parent
+// encrypted file.
+```
+
+Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with 
Java.
+
+### EncryptionManager
+
+The `EncryptionManager` trait abstracts encryption orchestration. 
`StandardEncryptionManager`
+implements envelope encryption with KMS-backed KEK management, KEK caching, 
and rotation:
+
+```rust
+#[async_trait]
+pub trait EncryptionManager: Debug + Send + Sync {
+    /// Decrypt an AGS1 stream-encrypted file.
+    async fn decrypt(&self, encrypted: EncryptedInputFile) -> 
Result<InputFile>;
+
+    /// Encrypt a file with AGS1 stream encryption.
+    async fn encrypt(&self, raw_output: OutputFile) -> 
Result<EncryptedOutputFile>;
+
+    /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial).
+    async fn encrypt_native(&self, raw_output: OutputFile) -> 
Result<EncryptedOutputFile>;
+
+    /// Unwrap key metadata for a manifest list.
+    /// 1. Look up the manifest list's EncryptedKey by key ID
+    /// 2. Find the KEK via encrypted_by_id
+    /// 3. Unwrap the KEK via KMS: 
kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id)
+    /// 4. AES-GCM decrypt the manifest list's StandardKeyMetadata using the 
KEK,
+    ///    with KEY_TIMESTAMP as AAD
+    /// 5. Return the decrypted StandardKeyMetadata bytes (containing 
plaintext DEK)
+    async fn unwrap_key_metadata(
+        &self, encrypted_key: &EncryptedKey,
+        encryption_keys: &HashMap<String, EncryptedKey>,
+    ) -> Result<Vec<u8>>;
+
+    /// Wrap key metadata for a manifest list with a KEK for storage in table 
metadata.
+    /// 1. Get or create a KEK (wrapping new KEK via KMS if needed)
+    /// 2. AES-GCM encrypt the StandardKeyMetadata using the KEK, with 
KEY_TIMESTAMP as AAD
+    /// 3. Return the manifest list EncryptedKey (encrypted_by_id = kek_id)
+    ///    and optionally a new KEK EncryptedKey if one was created
+    async fn wrap_key_metadata(
+        &self, key_metadata: &[u8],
+    ) -> Result<(EncryptedKey, Option<EncryptedKey>)>;
+}
+```
+
+`StandardEncryptionManager` is typically not constructed directly by users. 
Instead,
+`TableBuilder::build()` constructs it automatically from a 
`KeyManagementClient`
+extension and the table's properties (see [Catalog 
Integration](#catalog-integration) below).
+For manual construction in tests:
+
+```rust
+let em = StandardEncryptionManager::new(Arc::new(kms_client))
+    .with_table_key_id("master-key-1")   // Master key ID in KMS
+    .with_encryption_keys(table_metadata.encryption_keys.clone());
+```
+
+### AGS1 Stream Encryption
+
+Block-based stream encryption format compatible with Java's 
`AesGcmInputStream`/`AesGcmOutputStream`.
+
+#### Format
+
+```
+┌──────────────────────────────────────────┐
+│ Header (8 bytes)                         │
+│   Magic: "AGS1" (4 bytes)               │
+│   Plain block size: u32 LE (4 bytes)     │
+│     Default: 1,048,576 (1 MiB)           │
+├──────────────────────────────────────────┤
+│ Block 0                                  │
+│   Nonce (12 bytes)                       │
+│   Ciphertext (up to plain_block_size)    │
+│   GCM Tag (16 bytes)                     │
+├──────────────────────────────────────────┤
+│ Block 1..N (same structure)              │
+├──────────────────────────────────────────┤
+│ Final block (may be shorter)             │
+└──────────────────────────────────────────┘
+```
+
+Each block's AAD is constructed as `aad_prefix || block_index (4 bytes LE)`, 
binding each
+block to its position in the stream to prevent reordering attacks.
+
+**`AesGcmFileRead`** implements the `FileRead` trait for transparent AGS1 
decryption with
+random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for 
transparent AGS1
+encryption with block buffering.
+
+### InputFile and OutputFile Enums
+
+`InputFile` and `OutputFile` are enums with three variants each:
+
+```rust
+pub enum InputFile {
+    /// Standard unencrypted file.
+    Plain { storage: Arc<dyn Storage>, path: String },
+
+    /// AGS1 stream-encrypted file. Transparent decryption on read.
+    Encrypted { storage: Arc<dyn Storage>, path: String, decryptor: 
Arc<FileDecryptor> },
+
+    /// Parquet Modular Encryption. Raw reads; Parquet reader handles 
decryption.
+    NativeEncrypted { storage: Arc<dyn Storage>, path: String, key_material: 
NativeKeyMaterial },
+}
+
+pub enum OutputFile {
+    Plain { storage: Arc<dyn Storage>, path: String },
+    Encrypted { storage: Arc<dyn Storage>, path: String, encryptor: 
Arc<FileEncryptor> },
+    NativeEncrypted { storage: Arc<dyn Storage>, path: String, key_material: 
NativeKeyMaterial },
+}
+```
+
+`NativeKeyMaterial` carries the plaintext DEK and AAD prefix for Parquet's
+`FileEncryptionProperties` / `FileDecryptionProperties`.
+
+Common operations (`location()`, `exists()`, `read()`, `reader()`, `write()`, 
`writer()`)
+delegate to the appropriate variant, with `Encrypted` variants transparently 
encrypting/decrypting
+via `AesGcmFileRead`/`AesGcmFileWrite`.
+
+#### Adaptation for Storage Trait RFC
+
+Once RFC 0002 merges, `InputFile` will hold `Arc<dyn Storage>` instead of 
`Operator`. This is
+already the case in this implementation — the enum structure is stable. Only 
the underlying
+`Storage` trait implementation may change.
+
+### FileIO Integration
+
+The `EncryptionManager` is stored as a type-safe `FileIOBuilder` extension. 
This integrates
+naturally with catalogs that support extensions (e.g. 
`RestCatalog.with_file_io_extension()`):
+
+```rust
+// Via FileIOBuilder extension (works with RestCatalog and any extension-aware 
catalog)
+let file_io = FileIOBuilder::new("s3")
+    .with_prop("s3.region", "us-east-1")
+    .with_extension(encryption_manager)
+    .build()?;
+
+// Or via convenience method on FileIO
+let file_io = file_io.with_encryption_manager(encryption_manager);
+```
+
+FileIO provides encryption-aware factory methods:
+
+| Method | Purpose |
+|--------|---------|
+| `new_encrypted_input(path, key_metadata)` | AGS1 stream decryption 
(manifests, manifest lists) |
+| `new_encrypted_output(path)` | AGS1 stream encryption |
+| `new_native_encrypted_input(path, key_metadata)` | PME input (Parquet 
handles decryption) |
+| `new_native_encrypted_output(path)` | PME output (Parquet handles 
encryption) |
+| `encryption_manager()` | Returns the configured EncryptionManager, if any |
+
+#### After Storage Trait RFC
+
+RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` 
will instead be
+provided through the `StorageFactory` or configured at the catalog level:
+
+```rust
+// Option A: EncryptionManager on the catalog
+let catalog = GlueCatalogBuilder::default()
+    .with_storage_factory(Arc::new(OpenDalStorageFactory::S3))
+    .with_encryption_manager(encryption_manager)

Review Comment:
   Oh interesting, where they can provide a way to construct a number of KMS 
clients? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to