This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a1ec0fa  Object Cache: caches parsed Manifests and ManifestLists for 
performance (#512)
a1ec0fa is described below

commit a1ec0fa98113fc02b2479d81759fccd9dab10378
Author: Scott Donnelly <[email protected]>
AuthorDate: Mon Aug 19 10:48:43 2024 +0100

    Object Cache: caches parsed Manifests and ManifestLists for performance 
(#512)
    
    * feat: adds ObjectCache, to cache Manifests and ManifestLists
    
    * refactor: change obj cache method names and use more readable default 
usize value
    
    * chore: improve error message
    
    Co-authored-by: Renjie Liu <[email protected]>
    
    * fix: change object cache retrieval method visibility
    
    Co-authored-by: Renjie Liu <[email protected]>
    
    * feat: improved error message in object cache get_manifest
    
    * test(object-cache): add unit tests for object cache manifest and manifest 
list retrieval
    
    * fix: ensure that object cache insertions are weighted by size
    
    * test: fix test typo
    
    * fix: ensure object cache weight is that of the wrapped item, not the Arc
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/catalog/glue/src/catalog.rs        |  12 +-
 crates/catalog/hms/src/catalog.rs         |  12 +-
 crates/catalog/memory/src/catalog.rs      |  13 +-
 crates/catalog/rest/src/catalog.rs        |  16 +-
 crates/iceberg/Cargo.toml                 |   1 +
 crates/iceberg/src/io/mod.rs              |   2 +
 crates/iceberg/src/io/object_cache.rs     | 402 ++++++++++++++++++++++++++++++
 crates/iceberg/src/scan.rs                |  49 ++--
 crates/iceberg/src/spec/manifest.rs       |   2 +-
 crates/iceberg/src/spec/table_metadata.rs |   2 +-
 crates/iceberg/src/table.rs               | 154 +++++++++++-
 crates/iceberg/src/transaction.rs         |   2 +
 12 files changed, 599 insertions(+), 68 deletions(-)

diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index 16acaa7..18e30f3 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -381,14 +381,12 @@ impl Catalog for GlueCatalog {
 
         builder.send().await.map_err(from_aws_sdk_error)?;
 
-        let table = Table::builder()
+        Table::builder()
             .file_io(self.file_io())
             .metadata_location(metadata_location)
             .metadata(metadata)
             .identifier(TableIdent::new(NamespaceIdent::new(db_name), 
table_name))
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Loads a table from the Glue Catalog and constructs a `Table` object
@@ -432,7 +430,7 @@ impl Catalog for GlueCatalog {
                 let metadata_content = input_file.read().await?;
                 let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
 
-                let table = Table::builder()
+                Table::builder()
                     .file_io(self.file_io())
                     .metadata_location(metadata_location)
                     .metadata(metadata)
@@ -440,9 +438,7 @@ impl Catalog for GlueCatalog {
                         NamespaceIdent::new(db_name),
                         table_name.to_owned(),
                     ))
-                    .build();
-
-                Ok(table)
+                    .build()
             }
         }
     }
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index 524ecee..6e5db19 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -369,14 +369,12 @@ impl Catalog for HmsCatalog {
             .await
             .map_err(from_thrift_error)?;
 
-        let table = Table::builder()
+        Table::builder()
             .file_io(self.file_io())
             .metadata_location(metadata_location)
             .metadata(metadata)
             .identifier(TableIdent::new(NamespaceIdent::new(db_name), 
table_name))
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Loads a table from the Hive Metastore and constructs a `Table` object
@@ -407,7 +405,7 @@ impl Catalog for HmsCatalog {
         let metadata_content = 
self.file_io.new_input(&metadata_location)?.read().await?;
         let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
 
-        let table = Table::builder()
+        Table::builder()
             .file_io(self.file_io())
             .metadata_location(metadata_location)
             .metadata(metadata)
@@ -415,9 +413,7 @@ impl Catalog for HmsCatalog {
                 NamespaceIdent::new(db_name),
                 table.name.clone(),
             ))
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Asynchronously drops a table from the database.
diff --git a/crates/catalog/memory/src/catalog.rs 
b/crates/catalog/memory/src/catalog.rs
index 44086f8..05038cb 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -209,14 +209,12 @@ impl Catalog for MemoryCatalog {
 
         root_namespace_state.insert_new_table(&table_ident, 
metadata_location.clone())?;
 
-        let table = Table::builder()
+        Table::builder()
             .file_io(self.file_io.clone())
             .metadata_location(metadata_location)
             .metadata(metadata)
             .identifier(table_ident)
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Load table from the catalog.
@@ -227,14 +225,13 @@ impl Catalog for MemoryCatalog {
         let input_file = self.file_io.new_input(metadata_location)?;
         let metadata_content = input_file.read().await?;
         let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
-        let table = Table::builder()
+
+        Table::builder()
             .file_io(self.file_io.clone())
             .metadata_location(metadata_location.clone())
             .metadata(metadata)
             .identifier(table_ident.clone())
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Drop a table from the catalog.
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index d74c8de..2afefad 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -516,7 +516,7 @@ impl Catalog for RestCatalog {
             .load_file_io(resp.metadata_location.as_deref(), resp.config)
             .await?;
 
-        let table = Table::builder()
+        Table::builder()
             .identifier(table_ident)
             .file_io(file_io)
             .metadata(resp.metadata)
@@ -526,9 +526,7 @@ impl Catalog for RestCatalog {
                     "Metadata location missing in create table response!",
                 )
             })?)
-            .build();
-
-        Ok(table)
+            .build()
     }
 
     /// Load table from the catalog.
@@ -560,9 +558,9 @@ impl Catalog for RestCatalog {
             .metadata(resp.metadata);
 
         if let Some(metadata_location) = resp.metadata_location {
-            Ok(table_builder.metadata_location(metadata_location).build())
+            table_builder.metadata_location(metadata_location).build()
         } else {
-            Ok(table_builder.build())
+            table_builder.build()
         }
     }
 
@@ -661,12 +659,12 @@ impl Catalog for RestCatalog {
         let file_io = self
             .load_file_io(Some(&resp.metadata_location), None)
             .await?;
-        Ok(Table::builder()
+        Table::builder()
             .identifier(commit.identifier().clone())
             .file_io(file_io)
             .metadata(resp.metadata)
             .metadata_location(resp.metadata_location)
-            .build())
+            .build()
     }
 }
 
@@ -1661,6 +1659,7 @@ mod tests {
                 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
                 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
                 .build()
+                .unwrap()
         };
 
         let table = Transaction::new(&table1)
@@ -1785,6 +1784,7 @@ mod tests {
                 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
                 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
                 .build()
+                .unwrap()
         };
 
         let table_result = Transaction::new(&table1)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 8b0a7e1..6218e98 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -60,6 +60,7 @@ derive_builder = { workspace = true }
 fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
+moka = { version = "0.12.8", features = ["future"] }
 murmur3 = { workspace = true }
 once_cell = { workspace = true }
 opendal = { workspace = true }
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index b8b7e27..52a1da2 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -78,8 +78,10 @@ use storage_memory::*;
 mod storage_s3;
 #[cfg(feature = "storage-s3")]
 pub use storage_s3::*;
+pub(crate) mod object_cache;
 #[cfg(feature = "storage-fs")]
 mod storage_fs;
+
 #[cfg(feature = "storage-fs")]
 use storage_fs::*;
 #[cfg(feature = "storage-gcs")]
diff --git a/crates/iceberg/src/io/object_cache.rs 
b/crates/iceberg/src/io/object_cache.rs
new file mode 100644
index 0000000..3b89a4e
--- /dev/null
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::io::FileIO;
+use crate::spec::{
+    FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, 
SnapshotRef, TableMetadataRef,
+};
+use crate::{Error, ErrorKind, Result};
+
+const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB
+
+#[derive(Clone, Debug)]
+pub(crate) enum CachedItem {
+    ManifestList(Arc<ManifestList>),
+    Manifest(Arc<Manifest>),
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub(crate) enum CachedObjectKey {
+    ManifestList((String, FormatVersion, SchemaId)),
+    Manifest(String),
+}
+
+/// Caches metadata objects deserialized from immutable files
+#[derive(Clone, Debug)]
+pub struct ObjectCache {
+    cache: moka::future::Cache<CachedObjectKey, CachedItem>,
+    file_io: FileIO,
+    cache_disabled: bool,
+}
+
+impl ObjectCache {
+    /// Creates a new [`ObjectCache`]
+    /// with the default cache size
+    pub(crate) fn new(file_io: FileIO) -> Self {
+        Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
+    }
+
+    /// Creates a new [`ObjectCache`]
+    /// with a specific cache size
+    pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> 
Self {
+        if cache_size_bytes == 0 {
+            Self::with_disabled_cache(file_io)
+        } else {
+            Self {
+                cache: moka::future::Cache::builder()
+                    .weigher(|_, val: &CachedItem| match val {
+                        CachedItem::ManifestList(item) => 
size_of_val(item.as_ref()),
+                        CachedItem::Manifest(item) => 
size_of_val(item.as_ref()),
+                    } as u32)
+                    .max_capacity(cache_size_bytes)
+                    .build(),
+                file_io,
+                cache_disabled: false,
+            }
+        }
+    }
+
+    /// Creates a new [`ObjectCache`]
+    /// with caching disabled
+    pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
+        Self {
+            cache: moka::future::Cache::new(0),
+            file_io,
+            cache_disabled: true,
+        }
+    }
+
+    /// Retrieves an Arc [`Manifest`] from the cache
+    /// or retrieves one from FileIO and parses it if not present
+    pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> 
Result<Arc<Manifest>> {
+        if self.cache_disabled {
+            return manifest_file
+                .load_manifest(&self.file_io)
+                .await
+                .map(Arc::new);
+        }
+
+        let key = 
CachedObjectKey::Manifest(manifest_file.manifest_path.clone());
+
+        let cache_entry = self
+            .cache
+            .entry_by_ref(&key)
+            .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
+            .await
+            .map_err(|err| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    format!("Failed to load manifest {}", 
manifest_file.manifest_path),
+                )
+                .with_source(err)
+            })?
+            .into_value();
+
+        match cache_entry {
+            CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
+            _ => Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("cached object for key '{:?}' is not a Manifest", key),
+            )),
+        }
+    }
+
+    /// Retrieves an Arc [`ManifestList`] from the cache
+    /// or retrieves one from FileIO and parses it if not present
+    pub(crate) async fn get_manifest_list(
+        &self,
+        snapshot: &SnapshotRef,
+        table_metadata: &TableMetadataRef,
+    ) -> Result<Arc<ManifestList>> {
+        if self.cache_disabled {
+            return snapshot
+                .load_manifest_list(&self.file_io, table_metadata)
+                .await
+                .map(Arc::new);
+        }
+
+        let key = CachedObjectKey::ManifestList((
+            snapshot.manifest_list().to_string(),
+            table_metadata.format_version,
+            snapshot.schema_id().unwrap(),
+        ));
+        let cache_entry = self
+            .cache
+            .entry_by_ref(&key)
+            .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, 
table_metadata))
+            .await
+            .map_err(|err| Error::new(ErrorKind::Unexpected, 
err.as_ref().message()))?
+            .into_value();
+
+        match cache_entry {
+            CachedItem::ManifestList(arc_manifest_list) => 
Ok(arc_manifest_list),
+            _ => Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("cached object for path '{:?}' is not a manifest 
list", key),
+            )),
+        }
+    }
+
+    async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> 
Result<CachedItem> {
+        let manifest = manifest_file.load_manifest(&self.file_io).await?;
+
+        Ok(CachedItem::Manifest(Arc::new(manifest)))
+    }
+
+    async fn fetch_and_parse_manifest_list(
+        &self,
+        snapshot: &SnapshotRef,
+        table_metadata: &TableMetadataRef,
+    ) -> Result<CachedItem> {
+        let manifest_list = snapshot
+            .load_manifest_list(&self.file_io, table_metadata)
+            .await?;
+
+        Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs;
+
+    use tempfile::TempDir;
+    use tera::{Context, Tera};
+    use uuid::Uuid;
+
+    use super::*;
+    use crate::io::{FileIO, OutputFile};
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Manifest,
+        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
+        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
+    };
+    use crate::table::Table;
+    use crate::TableIdent;
+
+    struct TableTestFixture {
+        table_location: String,
+        table: Table,
+    }
+
+    impl TableTestFixture {
+        fn new() -> Self {
+            let tmp_dir = TempDir::new().unwrap();
+            let table_location = tmp_dir.path().join("table1");
+            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
+            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
+            let table_metadata1_location = 
table_location.join("metadata/v1.json");
+
+            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+                .unwrap()
+                .build()
+                .unwrap();
+
+            let table_metadata = {
+                let template_json_str = fs::read_to_string(format!(
+                    "{}/testdata/example_table_metadata_v2.json",
+                    env!("CARGO_MANIFEST_DIR")
+                ))
+                .unwrap();
+                let mut context = Context::new();
+                context.insert("table_location", &table_location);
+                context.insert("manifest_list_1_location", 
&manifest_list1_location);
+                context.insert("manifest_list_2_location", 
&manifest_list2_location);
+                context.insert("table_metadata_1_location", 
&table_metadata1_location);
+
+                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
+                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+            };
+
+            let table = Table::builder()
+                .metadata(table_metadata)
+                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+                .file_io(file_io.clone())
+                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .build()
+                .unwrap();
+
+            Self {
+                table_location: table_location.to_str().unwrap().to_string(),
+                table,
+            }
+        }
+
+        fn next_manifest_file(&self) -> OutputFile {
+            self.table
+                .file_io()
+                .new_output(format!(
+                    "{}/metadata/manifest_{}.avro",
+                    self.table_location,
+                    Uuid::new_v4()
+                ))
+                .unwrap()
+        }
+
+        async fn setup_manifest_files(&mut self) {
+            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
+            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
+            let current_partition_spec = 
self.table.metadata().default_partition_spec().unwrap();
+
+            // Write data files
+            let data_file_manifest = ManifestWriter::new(
+                self.next_manifest_file(),
+                current_snapshot.snapshot_id(),
+                vec![],
+            )
+            .write(Manifest::new(
+                ManifestMetadata::builder()
+                    .schema((*current_schema).clone())
+                    .content(ManifestContentType::Data)
+                    .format_version(FormatVersion::V2)
+                    .partition_spec((**current_partition_spec).clone())
+                    .schema_id(current_schema.schema_id())
+                    .build(),
+                vec![ManifestEntry::builder()
+                    .status(ManifestStatus::Added)
+                    .data_file(
+                        DataFileBuilder::default()
+                            .content(DataContentType::Data)
+                            .file_path(format!("{}/1.parquet", 
&self.table_location))
+                            .file_format(DataFileFormat::Parquet)
+                            .file_size_in_bytes(100)
+                            .record_count(1)
+                            
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                            .build()
+                            .unwrap(),
+                    )
+                    .build()],
+            ))
+            .await
+            .unwrap();
+
+            // Write to manifest list
+            let mut manifest_list_write = ManifestListWriter::v2(
+                self.table
+                    .file_io()
+                    .new_output(current_snapshot.manifest_list())
+                    .unwrap(),
+                current_snapshot.snapshot_id(),
+                current_snapshot
+                    .parent_snapshot_id()
+                    .unwrap_or(EMPTY_SNAPSHOT_ID),
+                current_snapshot.sequence_number(),
+            );
+            manifest_list_write
+                .add_manifests(vec![data_file_manifest].into_iter())
+                .unwrap();
+            manifest_list_write.close().await.unwrap();
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_manifest_list_and_manifest_from_disabled_cache() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        let object_cache = 
ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
+
+        let result_manifest_list = object_cache
+            .get_manifest_list(
+                fixture.table.metadata().current_snapshot().unwrap(),
+                &fixture.table.metadata_ref(),
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(result_manifest_list.entries().len(), 1);
+
+        let manifest_file = result_manifest_list.entries().first().unwrap();
+        let result_manifest = 
object_cache.get_manifest(manifest_file).await.unwrap();
+
+        assert_eq!(
+            result_manifest
+                .entries()
+                .first()
+                .unwrap()
+                .file_path()
+                .split("/")
+                .last()
+                .unwrap(),
+            "1.parquet"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_get_manifest_list_and_manifest_from_default_cache() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        let object_cache = ObjectCache::new(fixture.table.file_io().clone());
+
+        // not in cache
+        let result_manifest_list = object_cache
+            .get_manifest_list(
+                fixture.table.metadata().current_snapshot().unwrap(),
+                &fixture.table.metadata_ref(),
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(result_manifest_list.entries().len(), 1);
+
+        // retrieve cached version
+        let result_manifest_list = object_cache
+            .get_manifest_list(
+                fixture.table.metadata().current_snapshot().unwrap(),
+                &fixture.table.metadata_ref(),
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(result_manifest_list.entries().len(), 1);
+
+        let manifest_file = result_manifest_list.entries().first().unwrap();
+
+        // not in cache
+        let result_manifest = 
object_cache.get_manifest(manifest_file).await.unwrap();
+
+        assert_eq!(
+            result_manifest
+                .entries()
+                .first()
+                .unwrap()
+                .file_path()
+                .split("/")
+                .last()
+                .unwrap(),
+            "1.parquet"
+        );
+
+        // retrieve cached version
+        let result_manifest = 
object_cache.get_manifest(manifest_file).await.unwrap();
+
+        assert_eq!(
+            result_manifest
+                .entries()
+                .first()
+                .unwrap()
+                .file_path()
+                .split("/")
+                .last()
+                .unwrap(),
+            "1.parquet"
+        );
+    }
+}
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 8a178dd..04aa1f5 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -32,6 +32,7 @@ use 
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato
 use crate::expr::visitors::inclusive_projection::InclusiveProjection;
 use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
 use crate::expr::{Bind, BoundPredicate, Predicate};
+use crate::io::object_cache::ObjectCache;
 use crate::io::FileIO;
 use crate::runtime::spawn;
 use crate::spec::{
@@ -242,7 +243,7 @@ impl<'a> TableScanBuilder<'a> {
             case_sensitive: self.case_sensitive,
             predicate: self.filter.map(Arc::new),
             snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
-            file_io: self.table.file_io().clone(),
+            object_cache: self.table.object_cache(),
             field_ids: Arc::new(field_ids),
             partition_filter_cache: Arc::new(PartitionFilterCache::new()),
             manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
@@ -292,7 +293,7 @@ struct PlanContext {
     case_sensitive: bool,
     predicate: Option<Arc<Predicate>>,
     snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
-    file_io: FileIO,
+    object_cache: Arc<ObjectCache>,
     field_ids: Arc<Vec<i32>>,
 
     partition_filter_cache: Arc<PartitionFilterCache>,
@@ -454,8 +455,8 @@ struct ManifestFileContext {
     sender: Sender<ManifestEntryContext>,
 
     field_ids: Arc<Vec<i32>>,
-    file_io: FileIO,
     bound_predicates: Option<Arc<BoundPredicates>>,
+    object_cache: Arc<ObjectCache>,
     snapshot_schema: SchemaRef,
     expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
 }
@@ -477,24 +478,22 @@ impl ManifestFileContext {
     /// streaming its constituent [`ManifestEntries`] to the channel provided 
in the context
     async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
         let ManifestFileContext {
-            file_io,
+            object_cache,
             manifest_file,
             bound_predicates,
             snapshot_schema,
             field_ids,
-            expression_evaluator_cache,
             mut sender,
+            expression_evaluator_cache,
             ..
         } = self;
 
-        let file_io_cloned = file_io.clone();
-        let manifest = manifest_file.load_manifest(&file_io_cloned).await?;
-
-        let (entries, _) = manifest.consume();
+        let manifest = object_cache.get_manifest(&manifest_file).await?;
 
-        for manifest_entry in entries.into_iter() {
+        for manifest_entry in manifest.entries() {
             let manifest_entry_context = ManifestEntryContext {
-                manifest_entry,
+                // TODO: refactor to avoid clone
+                manifest_entry: manifest_entry.clone(),
                 expression_evaluator_cache: expression_evaluator_cache.clone(),
                 field_ids: field_ids.clone(),
                 partition_spec_id: manifest_file.partition_spec_id,
@@ -530,9 +529,10 @@ impl ManifestEntryContext {
 }
 
 impl PlanContext {
-    async fn get_manifest_list(&self) -> Result<ManifestList> {
-        self.snapshot
-            .load_manifest_list(&self.file_io, &self.table_metadata)
+    async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
+        self.object_cache
+            .as_ref()
+            .get_manifest_list(&self.snapshot, &self.table_metadata)
             .await
     }
 
@@ -559,19 +559,19 @@ impl PlanContext {
 
     fn build_manifest_file_contexts(
         &self,
-        manifest_list: ManifestList,
+        manifest_list: Arc<ManifestList>,
         sender: Sender<ManifestEntryContext>,
     ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
         let filtered_entries = manifest_list
-            .consume_entries()
-            .into_iter()
+            .entries()
+            .iter()
             .filter(|manifest_file| manifest_file.content == 
ManifestContentType::Data);
 
         // TODO: Ideally we could ditch this intermediate Vec as we return an 
iterator.
         let mut filtered_mfcs = vec![];
         if self.predicate.is_some() {
             for manifest_file in filtered_entries {
-                let partition_bound_predicate = 
self.get_partition_filter(&manifest_file)?;
+                let partition_bound_predicate = 
self.get_partition_filter(manifest_file)?;
 
                 // evaluate the ManifestFile against the partition filter. Skip
                 // if it cannot contain any matching rows
@@ -581,7 +581,7 @@ impl PlanContext {
                         manifest_file.partition_spec_id,
                         partition_bound_predicate.clone(),
                     )
-                    .eval(&manifest_file)?
+                    .eval(manifest_file)?
                 {
                     let mfc = self.create_manifest_file_context(
                         manifest_file,
@@ -603,7 +603,7 @@ impl PlanContext {
 
     fn create_manifest_file_context(
         &self,
-        manifest_file: ManifestFile,
+        manifest_file: &ManifestFile,
         partition_filter: Option<Arc<BoundPredicate>>,
         sender: Sender<ManifestEntryContext>,
     ) -> ManifestFileContext {
@@ -620,10 +620,10 @@ impl PlanContext {
             };
 
         ManifestFileContext {
-            manifest_file,
+            manifest_file: manifest_file.clone(),
             bound_predicates,
             sender,
-            file_io: self.file_io.clone(),
+            object_cache: self.object_cache.clone(),
             snapshot_schema: self.snapshot_schema.clone(),
             field_ids: self.field_ids.clone(),
             expression_evaluator_cache: 
self.expression_evaluator_cache.clone(),
@@ -938,9 +938,10 @@ mod tests {
             let table = Table::builder()
                 .metadata(table_metadata)
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
-                .file_io(file_io)
+                .file_io(file_io.clone())
                 
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
-                .build();
+                .build()
+                .unwrap();
 
             Self {
                 table_location: table_location.to_str().unwrap().to_string(),
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index 14b8a80..eb3d022 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -95,7 +95,7 @@ impl Manifest {
     }
 
     /// Consume this Manifest, returning its constituent parts
-    pub fn consume(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
+    pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
         let Self { entries, metadata } = self;
         (entries, metadata)
     }
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index cd7f046..83b6017 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -856,7 +856,7 @@ pub(super) mod _serde {
     }
 }
 
-#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, 
Hash)]
 #[repr(u8)]
 /// Iceberg format version
 pub enum FormatVersion {
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index d28d6e5..406f9dd 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -16,28 +16,156 @@
 // under the License.
 
 //! Table API for Apache Iceberg
-use typed_builder::TypedBuilder;
+
+use std::sync::Arc;
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::io::object_cache::ObjectCache;
 use crate::io::FileIO;
 use crate::scan::TableScanBuilder;
 use crate::spec::{TableMetadata, TableMetadataRef};
-use crate::{Result, TableIdent};
+use crate::{Error, ErrorKind, Result, TableIdent};
+
+/// Builder to create table scan.
+pub struct TableBuilder {
+    file_io: Option<FileIO>,
+    metadata_location: Option<String>,
+    metadata: Option<TableMetadataRef>,
+    identifier: Option<TableIdent>,
+    readonly: bool,
+    disable_cache: bool,
+    cache_size_bytes: Option<u64>,
+}
+
+impl TableBuilder {
+    pub(crate) fn new() -> Self {
+        Self {
+            file_io: None,
+            metadata_location: None,
+            metadata: None,
+            identifier: None,
+            readonly: false,
+            disable_cache: false,
+            cache_size_bytes: None,
+        }
+    }
+
+    /// required - sets the necessary FileIO to use for the table
+    pub fn file_io(mut self, file_io: FileIO) -> Self {
+        self.file_io = Some(file_io);
+        self
+    }
+
+    /// optional - sets the tables metadata location
+    pub fn metadata_location<T: Into<String>>(mut self, metadata_location: T) 
-> Self {
+        self.metadata_location = Some(metadata_location.into());
+        self
+    }
+
+    /// required - passes in the TableMetadata to use for the Table
+    pub fn metadata<T: Into<TableMetadataRef>>(mut self, metadata: T) -> Self {
+        self.metadata = Some(metadata.into());
+        self
+    }
+
+    /// required - passes in the TableIdent to use for the Table
+    pub fn identifier(mut self, identifier: TableIdent) -> Self {
+        self.identifier = Some(identifier);
+        self
+    }
+
+    /// specifies if the Table is readonly or not (default not)
+    pub fn readonly(mut self, readonly: bool) -> Self {
+        self.readonly = readonly;
+        self
+    }
+
+    /// specifies if the Table's metadata cache will be disabled,
+    /// so that reads of Manifests and ManifestLists will never
+    /// get cached.
+    pub fn disable_cache(mut self) -> Self {
+        self.disable_cache = true;
+        self
+    }
+
+    /// optionally set a non-default metadata cache size
+    pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self {
+        self.cache_size_bytes = Some(cache_size_bytes);
+        self
+    }
+
+    /// build the Table
+    pub fn build(self) -> Result<Table> {
+        let Self {
+            file_io,
+            metadata_location,
+            metadata,
+            identifier,
+            readonly,
+            disable_cache,
+            cache_size_bytes,
+        } = self;
+
+        let Some(file_io) = file_io else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "FileIO must be provided with TableBuilder.file_io()",
+            ));
+        };
+
+        let Some(metadata) = metadata else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "TableMetadataRef must be provided with 
TableBuilder.metadata()",
+            ));
+        };
+
+        let Some(identifier) = identifier else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "TableIdent must be provided with TableBuilder.identifier()",
+            ));
+        };
+
+        let object_cache = if disable_cache {
+            Arc::new(ObjectCache::with_disabled_cache(file_io.clone()))
+        } else if let Some(cache_size_bytes) = cache_size_bytes {
+            Arc::new(ObjectCache::new_with_capacity(
+                file_io.clone(),
+                cache_size_bytes,
+            ))
+        } else {
+            Arc::new(ObjectCache::new(file_io.clone()))
+        };
+
+        Ok(Table {
+            file_io,
+            metadata_location,
+            metadata,
+            identifier,
+            readonly,
+            object_cache,
+        })
+    }
+}
 
 /// Table represents a table in the catalog.
-#[derive(TypedBuilder, Debug, Clone)]
+#[derive(Debug, Clone)]
 pub struct Table {
     file_io: FileIO,
-    #[builder(default, setter(strip_option, into))]
     metadata_location: Option<String>,
-    #[builder(setter(into))]
     metadata: TableMetadataRef,
     identifier: TableIdent,
-    #[builder(default = false)]
     readonly: bool,
+    object_cache: Arc<ObjectCache>,
 }
 
 impl Table {
+    /// Returns a TableBuilder to build a table
+    pub fn builder() -> TableBuilder {
+        TableBuilder::new()
+    }
+
     /// Returns table identifier.
     pub fn identifier(&self) -> &TableIdent {
         &self.identifier
@@ -62,6 +190,11 @@ impl Table {
         &self.file_io
     }
 
+    /// Returns this table's object cache
+    pub(crate) fn object_cache(&self) -> Arc<ObjectCache> {
+        self.object_cache.clone()
+    }
+
     /// Creates a table scan.
     pub fn scan(&self) -> TableScanBuilder<'_> {
         TableScanBuilder::new(self)
@@ -117,11 +250,11 @@ impl StaticTable {
         let table = Table::builder()
             .metadata(metadata)
             .identifier(table_ident)
-            .file_io(file_io)
+            .file_io(file_io.clone())
             .readonly(true)
             .build();
 
-        Ok(Self(table))
+        Ok(Self(table?))
     }
     /// Creates a static table directly from metadata file and `FileIO`
     pub async fn from_metadata_file(
@@ -232,8 +365,9 @@ mod tests {
         let table = Table::builder()
             .metadata(table_metadata)
             .identifier(static_identifier)
-            .file_io(file_io)
-            .build();
+            .file_io(file_io.clone())
+            .build()
+            .unwrap();
         assert!(!table.readonly());
         assert_eq!(table.identifier.name(), "table");
     }
diff --git a/crates/iceberg/src/transaction.rs 
b/crates/iceberg/src/transaction.rs
index 966a021..d416383 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -234,6 +234,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
             .build()
+            .unwrap()
     }
 
     fn make_v2_table() -> Table {
@@ -252,6 +253,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
             .build()
+            .unwrap()
     }
 
     #[test]


Reply via email to