This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a1ec0fa Object Cache: caches parsed Manifests and ManifestLists for
performance (#512)
a1ec0fa is described below
commit a1ec0fa98113fc02b2479d81759fccd9dab10378
Author: Scott Donnelly <[email protected]>
AuthorDate: Mon Aug 19 10:48:43 2024 +0100
Object Cache: caches parsed Manifests and ManifestLists for performance
(#512)
* feat: adds ObjectCache, to cache Manifests and ManifestLists
* refactor: change obj cache method names and use more readable default
usize value
* chore: improve error message
Co-authored-by: Renjie Liu <[email protected]>
* fix: change object cache retrieval method visibility
Co-authored-by: Renjie Liu <[email protected]>
* feat: improved error message in object cache get_manifest
* test(object-cache): add unit tests for object cache manifest and manifest
list retrieval
* fix: ensure that object cache insertions are weighted by size
* test: fix test typo
* fix: ensure object cache weight is that of the wrapped item, not the Arc
---------
Co-authored-by: Renjie Liu <[email protected]>
---
crates/catalog/glue/src/catalog.rs | 12 +-
crates/catalog/hms/src/catalog.rs | 12 +-
crates/catalog/memory/src/catalog.rs | 13 +-
crates/catalog/rest/src/catalog.rs | 16 +-
crates/iceberg/Cargo.toml | 1 +
crates/iceberg/src/io/mod.rs | 2 +
crates/iceberg/src/io/object_cache.rs | 402 ++++++++++++++++++++++++++++++
crates/iceberg/src/scan.rs | 49 ++--
crates/iceberg/src/spec/manifest.rs | 2 +-
crates/iceberg/src/spec/table_metadata.rs | 2 +-
crates/iceberg/src/table.rs | 154 +++++++++++-
crates/iceberg/src/transaction.rs | 2 +
12 files changed, 599 insertions(+), 68 deletions(-)
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index 16acaa7..18e30f3 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -381,14 +381,12 @@ impl Catalog for GlueCatalog {
builder.send().await.map_err(from_aws_sdk_error)?;
- let table = Table::builder()
+ Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name),
table_name))
- .build();
-
- Ok(table)
+ .build()
}
/// Loads a table from the Glue Catalog and constructs a `Table` object
@@ -432,7 +430,7 @@ impl Catalog for GlueCatalog {
let metadata_content = input_file.read().await?;
let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
- let table = Table::builder()
+ Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
@@ -440,9 +438,7 @@ impl Catalog for GlueCatalog {
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
- .build();
-
- Ok(table)
+ .build()
}
}
}
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index 524ecee..6e5db19 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -369,14 +369,12 @@ impl Catalog for HmsCatalog {
.await
.map_err(from_thrift_error)?;
- let table = Table::builder()
+ Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name),
table_name))
- .build();
-
- Ok(table)
+ .build()
}
/// Loads a table from the Hive Metastore and constructs a `Table` object
@@ -407,7 +405,7 @@ impl Catalog for HmsCatalog {
let metadata_content =
self.file_io.new_input(&metadata_location)?.read().await?;
let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
- let table = Table::builder()
+ Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
@@ -415,9 +413,7 @@ impl Catalog for HmsCatalog {
NamespaceIdent::new(db_name),
table.name.clone(),
))
- .build();
-
- Ok(table)
+ .build()
}
/// Asynchronously drops a table from the database.
diff --git a/crates/catalog/memory/src/catalog.rs
b/crates/catalog/memory/src/catalog.rs
index 44086f8..05038cb 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -209,14 +209,12 @@ impl Catalog for MemoryCatalog {
root_namespace_state.insert_new_table(&table_ident,
metadata_location.clone())?;
- let table = Table::builder()
+ Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident)
- .build();
-
- Ok(table)
+ .build()
}
/// Load table from the catalog.
@@ -227,14 +225,13 @@ impl Catalog for MemoryCatalog {
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
- let table = Table::builder()
+
+ Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location.clone())
.metadata(metadata)
.identifier(table_ident.clone())
- .build();
-
- Ok(table)
+ .build()
}
/// Drop a table from the catalog.
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index d74c8de..2afefad 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -516,7 +516,7 @@ impl Catalog for RestCatalog {
.load_file_io(resp.metadata_location.as_deref(), resp.config)
.await?;
- let table = Table::builder()
+ Table::builder()
.identifier(table_ident)
.file_io(file_io)
.metadata(resp.metadata)
@@ -526,9 +526,7 @@ impl Catalog for RestCatalog {
"Metadata location missing in create table response!",
)
})?)
- .build();
-
- Ok(table)
+ .build()
}
/// Load table from the catalog.
@@ -560,9 +558,9 @@ impl Catalog for RestCatalog {
.metadata(resp.metadata);
if let Some(metadata_location) = resp.metadata_location {
- Ok(table_builder.metadata_location(metadata_location).build())
+ table_builder.metadata_location(metadata_location).build()
} else {
- Ok(table_builder.build())
+ table_builder.build()
}
}
@@ -661,12 +659,12 @@ impl Catalog for RestCatalog {
let file_io = self
.load_file_io(Some(&resp.metadata_location), None)
.await?;
- Ok(Table::builder()
+ Table::builder()
.identifier(commit.identifier().clone())
.file_io(file_io)
.metadata(resp.metadata)
.metadata_location(resp.metadata_location)
- .build())
+ .build()
}
}
@@ -1661,6 +1659,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
+ .unwrap()
};
let table = Transaction::new(&table1)
@@ -1785,6 +1784,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
+ .unwrap()
};
let table_result = Transaction::new(&table1)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 8b0a7e1..6218e98 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -60,6 +60,7 @@ derive_builder = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
+moka = { version = "0.12.8", features = ["future"] }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index b8b7e27..52a1da2 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -78,8 +78,10 @@ use storage_memory::*;
mod storage_s3;
#[cfg(feature = "storage-s3")]
pub use storage_s3::*;
+pub(crate) mod object_cache;
#[cfg(feature = "storage-fs")]
mod storage_fs;
+
#[cfg(feature = "storage-fs")]
use storage_fs::*;
#[cfg(feature = "storage-gcs")]
diff --git a/crates/iceberg/src/io/object_cache.rs
b/crates/iceberg/src/io/object_cache.rs
new file mode 100644
index 0000000..3b89a4e
--- /dev/null
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::io::FileIO;
+use crate::spec::{
+ FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId,
SnapshotRef, TableMetadataRef,
+};
+use crate::{Error, ErrorKind, Result};
+
+const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB
+
+#[derive(Clone, Debug)]
+pub(crate) enum CachedItem {
+ ManifestList(Arc<ManifestList>),
+ Manifest(Arc<Manifest>),
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub(crate) enum CachedObjectKey {
+ ManifestList((String, FormatVersion, SchemaId)),
+ Manifest(String),
+}
+
+/// Caches metadata objects deserialized from immutable files
+#[derive(Clone, Debug)]
+pub struct ObjectCache {
+ cache: moka::future::Cache<CachedObjectKey, CachedItem>,
+ file_io: FileIO,
+ cache_disabled: bool,
+}
+
+impl ObjectCache {
+ /// Creates a new [`ObjectCache`]
+ /// with the default cache size
+ pub(crate) fn new(file_io: FileIO) -> Self {
+ Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
+ }
+
+ /// Creates a new [`ObjectCache`]
+ /// with a specific cache size
+ pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) ->
Self {
+ if cache_size_bytes == 0 {
+ Self::with_disabled_cache(file_io)
+ } else {
+ Self {
+ cache: moka::future::Cache::builder()
+ .weigher(|_, val: &CachedItem| match val {
+ CachedItem::ManifestList(item) =>
size_of_val(item.as_ref()),
+ CachedItem::Manifest(item) =>
size_of_val(item.as_ref()),
+ } as u32)
+ .max_capacity(cache_size_bytes)
+ .build(),
+ file_io,
+ cache_disabled: false,
+ }
+ }
+ }
+
+ /// Creates a new [`ObjectCache`]
+ /// with caching disabled
+ pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
+ Self {
+ cache: moka::future::Cache::new(0),
+ file_io,
+ cache_disabled: true,
+ }
+ }
+
+ /// Retrieves an Arc [`Manifest`] from the cache
+ /// or retrieves one from FileIO and parses it if not present
+ pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) ->
Result<Arc<Manifest>> {
+ if self.cache_disabled {
+ return manifest_file
+ .load_manifest(&self.file_io)
+ .await
+ .map(Arc::new);
+ }
+
+ let key =
CachedObjectKey::Manifest(manifest_file.manifest_path.clone());
+
+ let cache_entry = self
+ .cache
+ .entry_by_ref(&key)
+ .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
+ .await
+ .map_err(|err| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to load manifest {}",
manifest_file.manifest_path),
+ )
+ .with_source(err)
+ })?
+ .into_value();
+
+ match cache_entry {
+ CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("cached object for key '{:?}' is not a Manifest", key),
+ )),
+ }
+ }
+
+ /// Retrieves an Arc [`ManifestList`] from the cache
+ /// or retrieves one from FileIO and parses it if not present
+ pub(crate) async fn get_manifest_list(
+ &self,
+ snapshot: &SnapshotRef,
+ table_metadata: &TableMetadataRef,
+ ) -> Result<Arc<ManifestList>> {
+ if self.cache_disabled {
+ return snapshot
+ .load_manifest_list(&self.file_io, table_metadata)
+ .await
+ .map(Arc::new);
+ }
+
+ let key = CachedObjectKey::ManifestList((
+ snapshot.manifest_list().to_string(),
+ table_metadata.format_version,
+ snapshot.schema_id().unwrap(),
+ ));
+ let cache_entry = self
+ .cache
+ .entry_by_ref(&key)
+ .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot,
table_metadata))
+ .await
+ .map_err(|err| Error::new(ErrorKind::Unexpected,
err.as_ref().message()))?
+ .into_value();
+
+ match cache_entry {
+ CachedItem::ManifestList(arc_manifest_list) =>
Ok(arc_manifest_list),
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("cached object for path '{:?}' is not a manifest
list", key),
+ )),
+ }
+ }
+
+ async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) ->
Result<CachedItem> {
+ let manifest = manifest_file.load_manifest(&self.file_io).await?;
+
+ Ok(CachedItem::Manifest(Arc::new(manifest)))
+ }
+
+ async fn fetch_and_parse_manifest_list(
+ &self,
+ snapshot: &SnapshotRef,
+ table_metadata: &TableMetadataRef,
+ ) -> Result<CachedItem> {
+ let manifest_list = snapshot
+ .load_manifest_list(&self.file_io, table_metadata)
+ .await?;
+
+ Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::fs;
+
+ use tempfile::TempDir;
+ use tera::{Context, Tera};
+ use uuid::Uuid;
+
+ use super::*;
+ use crate::io::{FileIO, OutputFile};
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion,
Literal, Manifest,
+ ManifestContentType, ManifestEntry, ManifestListWriter,
ManifestMetadata, ManifestStatus,
+ ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
+ };
+ use crate::table::Table;
+ use crate::TableIdent;
+
+ struct TableTestFixture {
+ table_location: String,
+ table: Table,
+ }
+
+ impl TableTestFixture {
+ fn new() -> Self {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().join("table1");
+ let manifest_list1_location =
table_location.join("metadata/manifests_list_1.avro");
+ let manifest_list2_location =
table_location.join("metadata/manifests_list_2.avro");
+ let table_metadata1_location =
table_location.join("metadata/v1.json");
+
+ let file_io =
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let table_metadata = {
+ let template_json_str = fs::read_to_string(format!(
+ "{}/testdata/example_table_metadata_v2.json",
+ env!("CARGO_MANIFEST_DIR")
+ ))
+ .unwrap();
+ let mut context = Context::new();
+ context.insert("table_location", &table_location);
+ context.insert("manifest_list_1_location",
&manifest_list1_location);
+ context.insert("manifest_list_2_location",
&manifest_list2_location);
+ context.insert("table_metadata_1_location",
&table_metadata1_location);
+
+ let metadata_json = Tera::one_off(&template_json_str,
&context, false).unwrap();
+ serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+ };
+
+ let table = Table::builder()
+ .metadata(table_metadata)
+ .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+ .file_io(file_io.clone())
+
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+ .build()
+ .unwrap();
+
+ Self {
+ table_location: table_location.to_str().unwrap().to_string(),
+ table,
+ }
+ }
+
+ fn next_manifest_file(&self) -> OutputFile {
+ self.table
+ .file_io()
+ .new_output(format!(
+ "{}/metadata/manifest_{}.avro",
+ self.table_location,
+ Uuid::new_v4()
+ ))
+ .unwrap()
+ }
+
+ async fn setup_manifest_files(&mut self) {
+ let current_snapshot =
self.table.metadata().current_snapshot().unwrap();
+ let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
+ let current_partition_spec =
self.table.metadata().default_partition_spec().unwrap();
+
+ // Write data files
+ let data_file_manifest = ManifestWriter::new(
+ self.next_manifest_file(),
+ current_snapshot.snapshot_id(),
+ vec![],
+ )
+ .write(Manifest::new(
+ ManifestMetadata::builder()
+ .schema((*current_schema).clone())
+ .content(ManifestContentType::Data)
+ .format_version(FormatVersion::V2)
+ .partition_spec((**current_partition_spec).clone())
+ .schema_id(current_schema.schema_id())
+ .build(),
+ vec![ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .data_file(
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/1.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(100))]))
+ .build()
+ .unwrap(),
+ )
+ .build()],
+ ))
+ .await
+ .unwrap();
+
+ // Write to manifest list
+ let mut manifest_list_write = ManifestListWriter::v2(
+ self.table
+ .file_io()
+ .new_output(current_snapshot.manifest_list())
+ .unwrap(),
+ current_snapshot.snapshot_id(),
+ current_snapshot
+ .parent_snapshot_id()
+ .unwrap_or(EMPTY_SNAPSHOT_ID),
+ current_snapshot.sequence_number(),
+ );
+ manifest_list_write
+ .add_manifests(vec![data_file_manifest].into_iter())
+ .unwrap();
+ manifest_list_write.close().await.unwrap();
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_manifest_list_and_manifest_from_disabled_cache() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ let object_cache =
ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
+
+ let result_manifest_list = object_cache
+ .get_manifest_list(
+ fixture.table.metadata().current_snapshot().unwrap(),
+ &fixture.table.metadata_ref(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(result_manifest_list.entries().len(), 1);
+
+ let manifest_file = result_manifest_list.entries().first().unwrap();
+ let result_manifest =
object_cache.get_manifest(manifest_file).await.unwrap();
+
+ assert_eq!(
+ result_manifest
+ .entries()
+ .first()
+ .unwrap()
+ .file_path()
+ .split("/")
+ .last()
+ .unwrap(),
+ "1.parquet"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_get_manifest_list_and_manifest_from_default_cache() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ let object_cache = ObjectCache::new(fixture.table.file_io().clone());
+
+ // not in cache
+ let result_manifest_list = object_cache
+ .get_manifest_list(
+ fixture.table.metadata().current_snapshot().unwrap(),
+ &fixture.table.metadata_ref(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(result_manifest_list.entries().len(), 1);
+
+ // retrieve cached version
+ let result_manifest_list = object_cache
+ .get_manifest_list(
+ fixture.table.metadata().current_snapshot().unwrap(),
+ &fixture.table.metadata_ref(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(result_manifest_list.entries().len(), 1);
+
+ let manifest_file = result_manifest_list.entries().first().unwrap();
+
+ // not in cache
+ let result_manifest =
object_cache.get_manifest(manifest_file).await.unwrap();
+
+ assert_eq!(
+ result_manifest
+ .entries()
+ .first()
+ .unwrap()
+ .file_path()
+ .split("/")
+ .last()
+ .unwrap(),
+ "1.parquet"
+ );
+
+ // retrieve cached version
+ let result_manifest =
object_cache.get_manifest(manifest_file).await.unwrap();
+
+ assert_eq!(
+ result_manifest
+ .entries()
+ .first()
+ .unwrap()
+ .file_path()
+ .split("/")
+ .last()
+ .unwrap(),
+ "1.parquet"
+ );
+ }
+}
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 8a178dd..04aa1f5 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -32,6 +32,7 @@ use
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
+use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
@@ -242,7 +243,7 @@ impl<'a> TableScanBuilder<'a> {
case_sensitive: self.case_sensitive,
predicate: self.filter.map(Arc::new),
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
- file_io: self.table.file_io().clone(),
+ object_cache: self.table.object_cache(),
field_ids: Arc::new(field_ids),
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
@@ -292,7 +293,7 @@ struct PlanContext {
case_sensitive: bool,
predicate: Option<Arc<Predicate>>,
snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
- file_io: FileIO,
+ object_cache: Arc<ObjectCache>,
field_ids: Arc<Vec<i32>>,
partition_filter_cache: Arc<PartitionFilterCache>,
@@ -454,8 +455,8 @@ struct ManifestFileContext {
sender: Sender<ManifestEntryContext>,
field_ids: Arc<Vec<i32>>,
- file_io: FileIO,
bound_predicates: Option<Arc<BoundPredicates>>,
+ object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
}
@@ -477,24 +478,22 @@ impl ManifestFileContext {
/// streaming its constituent [`ManifestEntries`] to the channel provided
in the context
async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
let ManifestFileContext {
- file_io,
+ object_cache,
manifest_file,
bound_predicates,
snapshot_schema,
field_ids,
- expression_evaluator_cache,
mut sender,
+ expression_evaluator_cache,
..
} = self;
- let file_io_cloned = file_io.clone();
- let manifest = manifest_file.load_manifest(&file_io_cloned).await?;
-
- let (entries, _) = manifest.consume();
+ let manifest = object_cache.get_manifest(&manifest_file).await?;
- for manifest_entry in entries.into_iter() {
+ for manifest_entry in manifest.entries() {
let manifest_entry_context = ManifestEntryContext {
- manifest_entry,
+ // TODO: refactor to avoid clone
+ manifest_entry: manifest_entry.clone(),
expression_evaluator_cache: expression_evaluator_cache.clone(),
field_ids: field_ids.clone(),
partition_spec_id: manifest_file.partition_spec_id,
@@ -530,9 +529,10 @@ impl ManifestEntryContext {
}
impl PlanContext {
- async fn get_manifest_list(&self) -> Result<ManifestList> {
- self.snapshot
- .load_manifest_list(&self.file_io, &self.table_metadata)
+ async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
+ self.object_cache
+ .as_ref()
+ .get_manifest_list(&self.snapshot, &self.table_metadata)
.await
}
@@ -559,19 +559,19 @@ impl PlanContext {
fn build_manifest_file_contexts(
&self,
- manifest_list: ManifestList,
+ manifest_list: Arc<ManifestList>,
sender: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let filtered_entries = manifest_list
- .consume_entries()
- .into_iter()
+ .entries()
+ .iter()
.filter(|manifest_file| manifest_file.content ==
ManifestContentType::Data);
// TODO: Ideally we could ditch this intermediate Vec as we return an
iterator.
let mut filtered_mfcs = vec![];
if self.predicate.is_some() {
for manifest_file in filtered_entries {
- let partition_bound_predicate =
self.get_partition_filter(&manifest_file)?;
+ let partition_bound_predicate =
self.get_partition_filter(manifest_file)?;
// evaluate the ManifestFile against the partition filter. Skip
// if it cannot contain any matching rows
@@ -581,7 +581,7 @@ impl PlanContext {
manifest_file.partition_spec_id,
partition_bound_predicate.clone(),
)
- .eval(&manifest_file)?
+ .eval(manifest_file)?
{
let mfc = self.create_manifest_file_context(
manifest_file,
@@ -603,7 +603,7 @@ impl PlanContext {
fn create_manifest_file_context(
&self,
- manifest_file: ManifestFile,
+ manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
) -> ManifestFileContext {
@@ -620,10 +620,10 @@ impl PlanContext {
};
ManifestFileContext {
- manifest_file,
+ manifest_file: manifest_file.clone(),
bound_predicates,
sender,
- file_io: self.file_io.clone(),
+ object_cache: self.object_cache.clone(),
snapshot_schema: self.snapshot_schema.clone(),
field_ids: self.field_ids.clone(),
expression_evaluator_cache:
self.expression_evaluator_cache.clone(),
@@ -938,9 +938,10 @@ mod tests {
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
- .file_io(file_io)
+ .file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
- .build();
+ .build()
+ .unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index 14b8a80..eb3d022 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -95,7 +95,7 @@ impl Manifest {
}
/// Consume this Manifest, returning its constituent parts
- pub fn consume(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
+ pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
let Self { entries, metadata } = self;
(entries, metadata)
}
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index cd7f046..83b6017 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -856,7 +856,7 @@ pub(super) mod _serde {
}
}
-#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy,
Hash)]
#[repr(u8)]
/// Iceberg format version
pub enum FormatVersion {
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index d28d6e5..406f9dd 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -16,28 +16,156 @@
// under the License.
//! Table API for Apache Iceberg
-use typed_builder::TypedBuilder;
+
+use std::sync::Arc;
use crate::arrow::ArrowReaderBuilder;
+use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
-use crate::{Result, TableIdent};
+use crate::{Error, ErrorKind, Result, TableIdent};
+
+/// Builder to create table scan.
+pub struct TableBuilder {
+ file_io: Option<FileIO>,
+ metadata_location: Option<String>,
+ metadata: Option<TableMetadataRef>,
+ identifier: Option<TableIdent>,
+ readonly: bool,
+ disable_cache: bool,
+ cache_size_bytes: Option<u64>,
+}
+
+impl TableBuilder {
+ pub(crate) fn new() -> Self {
+ Self {
+ file_io: None,
+ metadata_location: None,
+ metadata: None,
+ identifier: None,
+ readonly: false,
+ disable_cache: false,
+ cache_size_bytes: None,
+ }
+ }
+
+ /// required - sets the necessary FileIO to use for the table
+ pub fn file_io(mut self, file_io: FileIO) -> Self {
+ self.file_io = Some(file_io);
+ self
+ }
+
+ /// optional - sets the tables metadata location
+ pub fn metadata_location<T: Into<String>>(mut self, metadata_location: T)
-> Self {
+ self.metadata_location = Some(metadata_location.into());
+ self
+ }
+
+ /// required - passes in the TableMetadata to use for the Table
+ pub fn metadata<T: Into<TableMetadataRef>>(mut self, metadata: T) -> Self {
+ self.metadata = Some(metadata.into());
+ self
+ }
+
+ /// required - passes in the TableIdent to use for the Table
+ pub fn identifier(mut self, identifier: TableIdent) -> Self {
+ self.identifier = Some(identifier);
+ self
+ }
+
+ /// specifies if the Table is readonly or not (default not)
+ pub fn readonly(mut self, readonly: bool) -> Self {
+ self.readonly = readonly;
+ self
+ }
+
+ /// specifies if the Table's metadata cache will be disabled,
+ /// so that reads of Manifests and ManifestLists will never
+ /// get cached.
+ pub fn disable_cache(mut self) -> Self {
+ self.disable_cache = true;
+ self
+ }
+
+ /// optionally set a non-default metadata cache size
+ pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self {
+ self.cache_size_bytes = Some(cache_size_bytes);
+ self
+ }
+
+ /// build the Table
+ pub fn build(self) -> Result<Table> {
+ let Self {
+ file_io,
+ metadata_location,
+ metadata,
+ identifier,
+ readonly,
+ disable_cache,
+ cache_size_bytes,
+ } = self;
+
+ let Some(file_io) = file_io else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "FileIO must be provided with TableBuilder.file_io()",
+ ));
+ };
+
+ let Some(metadata) = metadata else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "TableMetadataRef must be provided with
TableBuilder.metadata()",
+ ));
+ };
+
+ let Some(identifier) = identifier else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "TableIdent must be provided with TableBuilder.identifier()",
+ ));
+ };
+
+ let object_cache = if disable_cache {
+ Arc::new(ObjectCache::with_disabled_cache(file_io.clone()))
+ } else if let Some(cache_size_bytes) = cache_size_bytes {
+ Arc::new(ObjectCache::new_with_capacity(
+ file_io.clone(),
+ cache_size_bytes,
+ ))
+ } else {
+ Arc::new(ObjectCache::new(file_io.clone()))
+ };
+
+ Ok(Table {
+ file_io,
+ metadata_location,
+ metadata,
+ identifier,
+ readonly,
+ object_cache,
+ })
+ }
+}
/// Table represents a table in the catalog.
-#[derive(TypedBuilder, Debug, Clone)]
+#[derive(Debug, Clone)]
pub struct Table {
file_io: FileIO,
- #[builder(default, setter(strip_option, into))]
metadata_location: Option<String>,
- #[builder(setter(into))]
metadata: TableMetadataRef,
identifier: TableIdent,
- #[builder(default = false)]
readonly: bool,
+ object_cache: Arc<ObjectCache>,
}
impl Table {
+ /// Returns a TableBuilder to build a table
+ pub fn builder() -> TableBuilder {
+ TableBuilder::new()
+ }
+
/// Returns table identifier.
pub fn identifier(&self) -> &TableIdent {
&self.identifier
@@ -62,6 +190,11 @@ impl Table {
&self.file_io
}
+ /// Returns this table's object cache
+ pub(crate) fn object_cache(&self) -> Arc<ObjectCache> {
+ self.object_cache.clone()
+ }
+
/// Creates a table scan.
pub fn scan(&self) -> TableScanBuilder<'_> {
TableScanBuilder::new(self)
@@ -117,11 +250,11 @@ impl StaticTable {
let table = Table::builder()
.metadata(metadata)
.identifier(table_ident)
- .file_io(file_io)
+ .file_io(file_io.clone())
.readonly(true)
.build();
- Ok(Self(table))
+ Ok(Self(table?))
}
/// Creates a static table directly from metadata file and `FileIO`
pub async fn from_metadata_file(
@@ -232,8 +365,9 @@ mod tests {
let table = Table::builder()
.metadata(table_metadata)
.identifier(static_identifier)
- .file_io(file_io)
- .build();
+ .file_io(file_io.clone())
+ .build()
+ .unwrap();
assert!(!table.readonly());
assert_eq!(table.identifier.name(), "table");
}
diff --git a/crates/iceberg/src/transaction.rs
b/crates/iceberg/src/transaction.rs
index 966a021..d416383 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -234,6 +234,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
+ .unwrap()
}
fn make_v2_table() -> Table {
@@ -252,6 +253,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
+ .unwrap()
}
#[test]