This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0666b4b feat: add ManifestEvaluator (#322)
0666b4b is described below
commit 0666b4bd075c150abb1fd0a01b2689c22da6f815
Author: Scott Donnelly <[email protected]>
AuthorDate: Mon Apr 22 14:32:36 2024 +0100
feat: add ManifestEvaluator (#322)
---
.../src/expr/visitors/manifest_evaluator.rs | 466 +++++++++++++++++++++
crates/iceberg/src/expr/visitors/mod.rs | 1 +
crates/iceberg/src/transform/mod.rs | 2 +-
3 files changed, 468 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
new file mode 100644
index 0000000..16d6481
--- /dev/null
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::visitors::bound_predicate_visitor::{visit,
BoundPredicateVisitor};
+use crate::expr::visitors::inclusive_projection::InclusiveProjection;
+use crate::expr::{Bind, BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema,
SchemaRef};
+use crate::{Error, ErrorKind};
+use fnv::FnvHashSet;
+use std::sync::Arc;
+
+/// Evaluates [`ManifestFile`]s to see if their partition summary matches a
provided
+/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of
[`ManifestFile`]s
+/// in which data might be found that matches the TableScan's filter.
+pub(crate) struct ManifestEvaluator {
+ partition_schema: SchemaRef,
+ partition_filter: BoundPredicate,
+ case_sensitive: bool,
+}
+
+impl ManifestEvaluator {
+ pub(crate) fn new(
+ partition_spec: PartitionSpecRef,
+ table_schema: SchemaRef,
+ filter: BoundPredicate,
+ case_sensitive: bool,
+ ) -> crate::Result<Self> {
+ let partition_type = partition_spec.partition_type(&table_schema)?;
+
+ // this is needed as SchemaBuilder.with_fields expects an iterator over
+ // Arc<NestedField> rather than &Arc<NestedField>
+ let cloned_partition_fields: Vec<_> =
+ partition_type.fields().iter().map(Arc::clone).collect();
+
+ // The partition_schema's schema_id is set to the partition
+ // spec's spec_id here, and used to perform a sanity check
+ // during eval to confirm that it matches the spec_id
+ // of the ManifestFile we're evaluating
+ let partition_schema = Schema::builder()
+ .with_schema_id(partition_spec.spec_id)
+ .with_fields(cloned_partition_fields)
+ .build()?;
+
+ let partition_schema_ref = Arc::new(partition_schema);
+
+ let mut inclusive_projection =
InclusiveProjection::new(partition_spec.clone());
+ let unbound_partition_filter = inclusive_projection.project(&filter)?;
+
+ let partition_filter = unbound_partition_filter
+ .rewrite_not()
+ .bind(partition_schema_ref.clone(), case_sensitive)?;
+
+ Ok(Self {
+ partition_schema: partition_schema_ref,
+ partition_filter,
+ case_sensitive,
+ })
+ }
+
+ /// Evaluate this `ManifestEvaluator`'s filter predicate against the
+ /// provided [`ManifestFile`]'s partitions. Used by [`TableScan`] to
+ /// see if this `ManifestFile` could possibly contain data that matches
+ /// the scan's filter.
+ pub(crate) fn eval(&self, manifest_file: &ManifestFile) ->
crate::Result<bool> {
+ if manifest_file.partitions.is_empty() {
+ return Ok(true);
+ }
+
+ // The schema_id of self.partition_schema is set to the
+ // spec_id of the partition spec that this ManifestEvaluator
+ // was created from in ManifestEvaluator::new
+ if self.partition_schema.schema_id() !=
manifest_file.partition_spec_id {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition ID for manifest file '{}' does not match
partition ID for the Scan",
+ &manifest_file.manifest_path
+ ),
+ ));
+ }
+
+ let mut evaluator = ManifestFilterVisitor::new(self,
&manifest_file.partitions);
+
+ visit(&mut evaluator, &self.partition_filter)
+ }
+}
+
+struct ManifestFilterVisitor<'a> {
+ manifest_evaluator: &'a ManifestEvaluator,
+ partitions: &'a Vec<FieldSummary>,
+}
+
+impl<'a> ManifestFilterVisitor<'a> {
+ fn new(manifest_evaluator: &'a ManifestEvaluator, partitions: &'a
Vec<FieldSummary>) -> Self {
+ ManifestFilterVisitor {
+ manifest_evaluator,
+ partitions,
+ }
+ }
+}
+
+// Remove this annotation once all todos have been removed
+#[allow(unused_variables)]
+impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
+ type T = bool;
+
+ fn always_true(&mut self) -> crate::Result<bool> {
+ Ok(true)
+ }
+
+ fn always_false(&mut self) -> crate::Result<bool> {
+ Ok(false)
+ }
+
+ fn and(&mut self, lhs: bool, rhs: bool) -> crate::Result<bool> {
+ Ok(lhs && rhs)
+ }
+
+ fn or(&mut self, lhs: bool, rhs: bool) -> crate::Result<bool> {
+ Ok(lhs || rhs)
+ }
+
+ fn not(&mut self, inner: bool) -> crate::Result<bool> {
+ Ok(!inner)
+ }
+
+ fn is_null(
+ &mut self,
+ reference: &BoundReference,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ Ok(self.field_summary_for_reference(reference).contains_null)
+ }
+
+ fn not_null(
+ &mut self,
+ reference: &BoundReference,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn is_nan(
+ &mut self,
+ reference: &BoundReference,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ Ok(self
+ .field_summary_for_reference(reference)
+ .contains_nan
+ .is_some())
+ }
+
+ fn not_nan(
+ &mut self,
+ reference: &BoundReference,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn less_than(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn less_than_or_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn greater_than(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn greater_than_or_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn not_eq(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn starts_with(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn not_starts_with(
+ &mut self,
+ reference: &BoundReference,
+ literal: &Datum,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn r#in(
+ &mut self,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+
+ fn not_in(
+ &mut self,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
+ _predicate: &BoundPredicate,
+ ) -> crate::Result<bool> {
+ todo!()
+ }
+}
+
+impl ManifestFilterVisitor<'_> {
+ fn field_summary_for_reference(&self, reference: &BoundReference) ->
&FieldSummary {
+ let pos = reference.accessor().position();
+ &self.partitions[pos]
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
+ use crate::expr::{Bind, Predicate, PredicateOperator, Reference,
UnaryExpression};
+ use crate::spec::{
+ FieldSummary, ManifestContentType, ManifestFile, NestedField,
PartitionField,
+ PartitionSpec, PrimitiveType, Schema, Transform, Type,
+ };
+ use std::sync::Arc;
+
+ #[test]
+ fn test_manifest_file_no_partitions() {
+ let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+
+ let partition_filter = Predicate::AlwaysTrue
+ .bind(table_schema_ref.clone(), false)
+ .unwrap();
+
+ let case_sensitive = false;
+
+ let manifest_file_partitions = vec![];
+ let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+
+ let manifest_evaluator = ManifestEvaluator::new(
+ partition_spec_ref,
+ table_schema_ref,
+ partition_filter,
+ case_sensitive,
+ )
+ .unwrap();
+
+ let result = manifest_evaluator.eval(&manifest_file).unwrap();
+
+ assert!(result);
+ }
+
+ #[test]
+ fn test_manifest_file_trivial_partition_passing_filter() {
+ let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+
+ let partition_filter = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::IsNull,
+ Reference::new("a"),
+ ))
+ .bind(table_schema_ref.clone(), true)
+ .unwrap();
+
+ let manifest_file_partitions = vec![FieldSummary {
+ contains_null: true,
+ contains_nan: None,
+ lower_bound: None,
+ upper_bound: None,
+ }];
+ let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+
+ let manifest_evaluator =
+ ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
+ .unwrap();
+
+ let result = manifest_evaluator.eval(&manifest_file).unwrap();
+
+ assert!(result);
+ }
+
+ #[test]
+ fn test_manifest_file_partition_id_mismatch_returns_error() {
+ let (table_schema_ref, partition_spec_ref) =
+ create_test_schema_and_partition_spec_with_id_mismatch();
+
+ let partition_filter = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::IsNull,
+ Reference::new("a"),
+ ))
+ .bind(table_schema_ref.clone(), true)
+ .unwrap();
+
+ let manifest_file_partitions = vec![FieldSummary {
+ contains_null: true,
+ contains_nan: None,
+ lower_bound: None,
+ upper_bound: None,
+ }];
+ let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+
+ let manifest_evaluator =
+ ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
+ .unwrap();
+
+ let result = manifest_evaluator.eval(&manifest_file);
+
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_manifest_file_trivial_partition_rejected_filter() {
+ let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+
+ let partition_filter = Predicate::Unary(UnaryExpression::new(
+ PredicateOperator::IsNan,
+ Reference::new("a"),
+ ))
+ .bind(table_schema_ref.clone(), true)
+ .unwrap();
+
+ let manifest_file_partitions = vec![FieldSummary {
+ contains_null: false,
+ contains_nan: None,
+ lower_bound: None,
+ upper_bound: None,
+ }];
+ let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+
+ let manifest_evaluator =
+ ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
+ .unwrap();
+
+ let result = manifest_evaluator.eval(&manifest_file).unwrap();
+
+ assert!(!result);
+ }
+
+ fn create_test_schema_and_partition_spec() -> (Arc<Schema>,
Arc<PartitionSpec>) {
+ let table_schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::optional(
+ 1,
+ "a",
+ Type::Primitive(PrimitiveType::Float),
+ ))])
+ .build()
+ .unwrap();
+ let table_schema_ref = Arc::new(table_schema);
+
+ let partition_spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .field_id(1)
+ .transform(Transform::Identity)
+ .build()])
+ .build()
+ .unwrap();
+ let partition_spec_ref = Arc::new(partition_spec);
+ (table_schema_ref, partition_spec_ref)
+ }
+
+ fn create_test_schema_and_partition_spec_with_id_mismatch() ->
(Arc<Schema>, Arc<PartitionSpec>)
+ {
+ let table_schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::optional(
+ 1,
+ "a",
+ Type::Primitive(PrimitiveType::Float),
+ ))])
+ .build()
+ .unwrap();
+ let table_schema_ref = Arc::new(table_schema);
+
+ let partition_spec = PartitionSpec::builder()
+ // Spec ID here deliberately doesn't match the one from
create_test_manifest_file
+ .with_spec_id(999)
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .field_id(1)
+ .transform(Transform::Identity)
+ .build()])
+ .build()
+ .unwrap();
+ let partition_spec_ref = Arc::new(partition_spec);
+ (table_schema_ref, partition_spec_ref)
+ }
+
+ fn create_test_manifest_file(manifest_file_partitions: Vec<FieldSummary>)
-> ManifestFile {
+ ManifestFile {
+ manifest_path: "/test/path".to_string(),
+ manifest_length: 0,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: 0,
+ min_sequence_number: 0,
+ added_snapshot_id: 0,
+ added_data_files_count: None,
+ existing_data_files_count: None,
+ deleted_data_files_count: None,
+ added_rows_count: None,
+ existing_rows_count: None,
+ deleted_rows_count: None,
+ partitions: manifest_file_partitions,
+ key_metadata: vec![],
+ }
+ }
+}
diff --git a/crates/iceberg/src/expr/visitors/mod.rs
b/crates/iceberg/src/expr/visitors/mod.rs
index 47651d2..709ccd6 100644
--- a/crates/iceberg/src/expr/visitors/mod.rs
+++ b/crates/iceberg/src/expr/visitors/mod.rs
@@ -17,3 +17,4 @@
pub(crate) mod bound_predicate_visitor;
pub(crate) mod inclusive_projection;
+pub(crate) mod manifest_evaluator;
diff --git a/crates/iceberg/src/transform/mod.rs
b/crates/iceberg/src/transform/mod.rs
index b515666..cb221a2 100644
--- a/crates/iceberg/src/transform/mod.rs
+++ b/crates/iceberg/src/transform/mod.rs
@@ -147,7 +147,7 @@ mod test {
}
}
- /// A utitily struct, test fixture
+ /// A utility struct, test fixture
/// used for testing the transform on `Transform`
pub(crate) struct TestTransformFixture {
pub display: String,