This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7dfc548 Refactor: Extract `partition_filters` from
`ManifestEvaluator` (#360)
7dfc548 is described below
commit 7dfc548ebe42569fd7ca2143a13afdaeff8a9883
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Tue Apr 30 10:18:44 2024 +0200
Refactor: Extract `partition_filters` from `ManifestEvaluator` (#360)
* refactor: extract inclusive_projection from manifest_evaluator
* refactor: add FileScanStreamContext
* refactor: create partition_spec and partition_schema
* refactor: add cache structs
* refactor: use entry in partition_file_cache
* refactor: use result
* chore: update docs + fmt
* refactor: add bound_filter to FileScanStreamContext
* refactor: return ref BoundPredicate
* fix: return type PartitionSpecRef
* refactor: remove spec_id runtime check
* feat: add check for content_type data
---
crates/iceberg/src/expr/predicate.rs | 10 +-
.../src/expr/visitors/manifest_evaluator.rs | 347 ++++++++++-----------
crates/iceberg/src/scan.rs | 275 ++++++++++++----
3 files changed, 371 insertions(+), 261 deletions(-)
diff --git a/crates/iceberg/src/expr/predicate.rs
b/crates/iceberg/src/expr/predicate.rs
index 6cdf4fc..1457d5a 100644
--- a/crates/iceberg/src/expr/predicate.rs
+++ b/crates/iceberg/src/expr/predicate.rs
@@ -32,7 +32,7 @@ use crate::spec::{Datum, SchemaRef};
use crate::{Error, ErrorKind};
/// Logical expression, such as `AND`, `OR`, `NOT`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub struct LogicalExpression<T, const N: usize> {
inputs: [Box<T>; N],
}
@@ -79,7 +79,7 @@ where
}
/// Unary predicate, for example, `a IS NULL`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub struct UnaryExpression<T> {
/// Operator of this predicate, must be single operand operator.
op: PredicateOperator,
@@ -126,7 +126,7 @@ impl<T> UnaryExpression<T> {
}
/// Binary predicate, for example, `a > 10`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub struct BinaryExpression<T> {
/// Operator of this predicate, must be binary operator, such as `=`, `>`,
`<`, etc.
op: PredicateOperator,
@@ -184,7 +184,7 @@ impl<T: Bind> Bind for BinaryExpression<T> {
}
/// Set predicates, for example, `a in (1, 2, 3)`.
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub struct SetExpression<T> {
/// Operator of this predicate, must be set operator, such as `IN`, `NOT
IN`, etc.
op: PredicateOperator,
@@ -613,7 +613,7 @@ impl Not for Predicate {
}
/// Bound predicate expression after binding to a schema.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum BoundPredicate {
/// An expression always evaluates to true.
AlwaysTrue,
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 16d6481..bcb5967 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -16,83 +16,39 @@
// under the License.
use crate::expr::visitors::bound_predicate_visitor::{visit,
BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema,
SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::Result;
use fnv::FnvHashSet;
-use std::sync::Arc;
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a
provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of
[`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
/// in which data might be found that matches the TableScan's filter.
pub(crate) struct ManifestEvaluator {
- partition_schema: SchemaRef,
partition_filter: BoundPredicate,
case_sensitive: bool,
}
impl ManifestEvaluator {
- pub(crate) fn new(
- partition_spec: PartitionSpecRef,
- table_schema: SchemaRef,
- filter: BoundPredicate,
- case_sensitive: bool,
- ) -> crate::Result<Self> {
- let partition_type = partition_spec.partition_type(&table_schema)?;
-
- // this is needed as SchemaBuilder.with_fields expects an iterator over
- // Arc<NestedField> rather than &Arc<NestedField>
- let cloned_partition_fields: Vec<_> =
- partition_type.fields().iter().map(Arc::clone).collect();
-
- // The partition_schema's schema_id is set to the partition
- // spec's spec_id here, and used to perform a sanity check
- // during eval to confirm that it matches the spec_id
- // of the ManifestFile we're evaluating
- let partition_schema = Schema::builder()
- .with_schema_id(partition_spec.spec_id)
- .with_fields(cloned_partition_fields)
- .build()?;
-
- let partition_schema_ref = Arc::new(partition_schema);
-
- let mut inclusive_projection =
InclusiveProjection::new(partition_spec.clone());
- let unbound_partition_filter = inclusive_projection.project(&filter)?;
-
- let partition_filter = unbound_partition_filter
- .rewrite_not()
- .bind(partition_schema_ref.clone(), case_sensitive)?;
-
- Ok(Self {
- partition_schema: partition_schema_ref,
+ pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool)
-> Self {
+ Self {
partition_filter,
case_sensitive,
- })
+ }
}
/// Evaluate this `ManifestEvaluator`'s filter predicate against the
/// provided [`ManifestFile`]'s partitions. Used by [`TableScan`] to
/// see if this `ManifestFile` could possibly contain data that matches
/// the scan's filter.
- pub(crate) fn eval(&self, manifest_file: &ManifestFile) ->
crate::Result<bool> {
+ pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> Result<bool> {
if manifest_file.partitions.is_empty() {
return Ok(true);
}
- // The schema_id of self.partition_schema is set to the
- // spec_id of the partition spec that this ManifestEvaluator
- // was created from in ManifestEvaluator::new
- if self.partition_schema.schema_id() !=
manifest_file.partition_spec_id {
- return Err(Error::new(
- ErrorKind::Unexpected,
- format!(
- "Partition ID for manifest file '{}' does not match
partition ID for the Scan",
- &manifest_file.manifest_path
- ),
- ));
- }
-
let mut evaluator = ManifestFilterVisitor::new(self,
&manifest_file.partitions);
visit(&mut evaluator, &self.partition_filter)
@@ -273,194 +229,211 @@ impl ManifestFilterVisitor<'_> {
#[cfg(test)]
mod test {
+ use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
- use crate::expr::{Bind, Predicate, PredicateOperator, Reference,
UnaryExpression};
+ use crate::expr::{
+ Bind, BoundPredicate, Predicate, PredicateOperator, Reference,
UnaryExpression,
+ };
use crate::spec::{
FieldSummary, ManifestContentType, ManifestFile, NestedField,
PartitionField,
- PartitionSpec, PrimitiveType, Schema, Transform, Type,
+ PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef,
Transform, Type,
};
+ use crate::Result;
use std::sync::Arc;
- #[test]
- fn test_manifest_file_no_partitions() {
- let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+ fn create_schema_and_partition_spec() -> Result<(SchemaRef,
PartitionSpecRef)> {
+ let schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::optional(
+ 1,
+ "a",
+ Type::Primitive(PrimitiveType::Float),
+ ))])
+ .build()?;
- let partition_filter = Predicate::AlwaysTrue
- .bind(table_schema_ref.clone(), false)
+ let spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .field_id(1)
+ .transform(Transform::Identity)
+ .build()])
+ .build()
.unwrap();
- let case_sensitive = false;
+ Ok((Arc::new(schema), Arc::new(spec)))
+ }
- let manifest_file_partitions = vec![];
- let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+ fn create_schema_and_partition_spec_with_id_mismatch() ->
Result<(SchemaRef, PartitionSpecRef)>
+ {
+ let schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::optional(
+ 1,
+ "a",
+ Type::Primitive(PrimitiveType::Float),
+ ))])
+ .build()?;
- let manifest_evaluator = ManifestEvaluator::new(
- partition_spec_ref,
- table_schema_ref,
- partition_filter,
- case_sensitive,
- )
- .unwrap();
+ let spec = PartitionSpec::builder()
+ .with_spec_id(999)
+ .with_fields(vec![PartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .field_id(1)
+ .transform(Transform::Identity)
+ .build()])
+ .build()
+ .unwrap();
- let result = manifest_evaluator.eval(&manifest_file).unwrap();
+ Ok((Arc::new(schema), Arc::new(spec)))
+ }
- assert!(result);
+ fn create_manifest_file(partitions: Vec<FieldSummary>) -> ManifestFile {
+ ManifestFile {
+ manifest_path: "/test/path".to_string(),
+ manifest_length: 0,
+ partition_spec_id: 1,
+ content: ManifestContentType::Data,
+ sequence_number: 0,
+ min_sequence_number: 0,
+ added_snapshot_id: 0,
+ added_data_files_count: None,
+ existing_data_files_count: None,
+ deleted_data_files_count: None,
+ added_rows_count: None,
+ existing_rows_count: None,
+ deleted_rows_count: None,
+ partitions,
+ key_metadata: vec![],
+ }
+ }
+
+ fn create_partition_schema(
+ partition_spec: &PartitionSpecRef,
+ schema: &SchemaRef,
+ ) -> Result<SchemaRef> {
+ let partition_type = partition_spec.partition_type(schema)?;
+
+ let partition_fields: Vec<_> =
partition_type.fields().iter().map(Arc::clone).collect();
+
+ let partition_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(partition_spec.spec_id)
+ .with_fields(partition_fields)
+ .build()?,
+ );
+
+ Ok(partition_schema)
+ }
+
+ fn create_partition_filter(
+ partition_spec: PartitionSpecRef,
+ partition_schema: SchemaRef,
+ filter: &BoundPredicate,
+ case_sensitive: bool,
+ ) -> Result<BoundPredicate> {
+ let mut inclusive_projection =
InclusiveProjection::new(partition_spec);
+
+ let partition_filter = inclusive_projection
+ .project(filter)?
+ .rewrite_not()
+ .bind(partition_schema, case_sensitive)?;
+
+ Ok(partition_filter)
+ }
+
+ fn create_manifest_evaluator(
+ schema: SchemaRef,
+ partition_spec: PartitionSpecRef,
+ filter: &BoundPredicate,
+ case_sensitive: bool,
+ ) -> Result<ManifestEvaluator> {
+ let partition_schema = create_partition_schema(&partition_spec,
&schema)?;
+ let partition_filter = create_partition_filter(
+ partition_spec,
+ partition_schema.clone(),
+ filter,
+ case_sensitive,
+ )?;
+
+ Ok(ManifestEvaluator::new(partition_filter, case_sensitive))
}
#[test]
- fn test_manifest_file_trivial_partition_passing_filter() {
- let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+ fn test_manifest_file_empty_partitions() -> Result<()> {
+ let case_sensitive = false;
- let partition_filter = Predicate::Unary(UnaryExpression::new(
- PredicateOperator::IsNull,
- Reference::new("a"),
- ))
- .bind(table_schema_ref.clone(), true)
- .unwrap();
+ let (schema, partition_spec) = create_schema_and_partition_spec()?;
- let manifest_file_partitions = vec![FieldSummary {
- contains_null: true,
- contains_nan: None,
- lower_bound: None,
- upper_bound: None,
- }];
- let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+ let filter = Predicate::AlwaysTrue.bind(schema.clone(),
case_sensitive)?;
+
+ let manifest_file = create_manifest_file(vec![]);
let manifest_evaluator =
- ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
- .unwrap();
+ create_manifest_evaluator(schema, partition_spec, &filter,
case_sensitive)?;
- let result = manifest_evaluator.eval(&manifest_file).unwrap();
+ let result = manifest_evaluator.eval(&manifest_file)?;
assert!(result);
+
+ Ok(())
}
#[test]
- fn test_manifest_file_partition_id_mismatch_returns_error() {
- let (table_schema_ref, partition_spec_ref) =
- create_test_schema_and_partition_spec_with_id_mismatch();
+ fn test_manifest_file_trivial_partition_passing_filter() -> Result<()> {
+ let case_sensitive = true;
- let partition_filter = Predicate::Unary(UnaryExpression::new(
+ let (schema, partition_spec) = create_schema_and_partition_spec()?;
+
+ let filter = Predicate::Unary(UnaryExpression::new(
PredicateOperator::IsNull,
Reference::new("a"),
))
- .bind(table_schema_ref.clone(), true)
- .unwrap();
+ .bind(schema.clone(), case_sensitive)?;
- let manifest_file_partitions = vec![FieldSummary {
+ let manifest_file = create_manifest_file(vec![FieldSummary {
contains_null: true,
contains_nan: None,
lower_bound: None,
upper_bound: None,
- }];
- let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+ }]);
let manifest_evaluator =
- ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
- .unwrap();
+ create_manifest_evaluator(schema, partition_spec, &filter,
case_sensitive)?;
- let result = manifest_evaluator.eval(&manifest_file);
+ let result = manifest_evaluator.eval(&manifest_file)?;
- assert!(result.is_err());
+ assert!(result);
+
+ Ok(())
}
#[test]
- fn test_manifest_file_trivial_partition_rejected_filter() {
- let (table_schema_ref, partition_spec_ref) =
create_test_schema_and_partition_spec();
+ fn test_manifest_file_trivial_partition_rejected_filter() -> Result<()> {
+ let case_sensitive = true;
+
+ let (schema, partition_spec) = create_schema_and_partition_spec()?;
- let partition_filter = Predicate::Unary(UnaryExpression::new(
+ let filter = Predicate::Unary(UnaryExpression::new(
PredicateOperator::IsNan,
Reference::new("a"),
))
- .bind(table_schema_ref.clone(), true)
- .unwrap();
+ .bind(schema.clone(), case_sensitive)?;
- let manifest_file_partitions = vec![FieldSummary {
+ let manifest_file = create_manifest_file(vec![FieldSummary {
contains_null: false,
contains_nan: None,
lower_bound: None,
upper_bound: None,
- }];
- let manifest_file =
create_test_manifest_file(manifest_file_partitions);
+ }]);
let manifest_evaluator =
- ManifestEvaluator::new(partition_spec_ref, table_schema_ref,
partition_filter, true)
- .unwrap();
+ create_manifest_evaluator(schema, partition_spec, &filter,
case_sensitive)?;
let result = manifest_evaluator.eval(&manifest_file).unwrap();
assert!(!result);
- }
-
- fn create_test_schema_and_partition_spec() -> (Arc<Schema>,
Arc<PartitionSpec>) {
- let table_schema = Schema::builder()
- .with_fields(vec![Arc::new(NestedField::optional(
- 1,
- "a",
- Type::Primitive(PrimitiveType::Float),
- ))])
- .build()
- .unwrap();
- let table_schema_ref = Arc::new(table_schema);
-
- let partition_spec = PartitionSpec::builder()
- .with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
- .source_id(1)
- .name("a".to_string())
- .field_id(1)
- .transform(Transform::Identity)
- .build()])
- .build()
- .unwrap();
- let partition_spec_ref = Arc::new(partition_spec);
- (table_schema_ref, partition_spec_ref)
- }
-
- fn create_test_schema_and_partition_spec_with_id_mismatch() ->
(Arc<Schema>, Arc<PartitionSpec>)
- {
- let table_schema = Schema::builder()
- .with_fields(vec![Arc::new(NestedField::optional(
- 1,
- "a",
- Type::Primitive(PrimitiveType::Float),
- ))])
- .build()
- .unwrap();
- let table_schema_ref = Arc::new(table_schema);
- let partition_spec = PartitionSpec::builder()
- // Spec ID here deliberately doesn't match the one from
create_test_manifest_file
- .with_spec_id(999)
- .with_fields(vec![PartitionField::builder()
- .source_id(1)
- .name("a".to_string())
- .field_id(1)
- .transform(Transform::Identity)
- .build()])
- .build()
- .unwrap();
- let partition_spec_ref = Arc::new(partition_spec);
- (table_schema_ref, partition_spec_ref)
- }
-
- fn create_test_manifest_file(manifest_file_partitions: Vec<FieldSummary>)
-> ManifestFile {
- ManifestFile {
- manifest_path: "/test/path".to_string(),
- manifest_length: 0,
- partition_spec_id: 1,
- content: ManifestContentType::Data,
- sequence_number: 0,
- min_sequence_number: 0,
- added_snapshot_id: 0,
- added_data_files_count: None,
- existing_data_files_count: None,
- deleted_data_files_count: None,
- added_rows_count: None,
- existing_rows_count: None,
- deleted_rows_count: None,
- partitions: manifest_file_partitions,
- key_metadata: vec![],
- }
+ Ok(())
}
}
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 36f71c1..b842522 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,21 +18,29 @@
//! Table scan api.
use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
-use crate::expr::{Bind, Predicate};
+use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
- DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata,
TableMetadataRef,
+ DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile,
PartitionSpecRef, Schema,
+ SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
-use crate::{Error, ErrorKind};
+use crate::{Error, ErrorKind, Result};
use arrow_array::RecordBatch;
use async_stream::try_stream;
-use futures::stream::{iter, BoxStream};
+use futures::stream::BoxStream;
use futures::StreamExt;
+use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
+/// A stream of arrow [`RecordBatch`]es.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
+
/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
table: &'a Table,
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
}
/// Build the table scan.
- pub fn build(self) -> crate::Result<TableScan> {
+ pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
@@ -169,55 +177,67 @@ pub struct TableScan {
filter: Option<Arc<Predicate>>,
}
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
impl TableScan {
- /// Returns a stream of file scan tasks.
-
- pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
- // Cache `ManifestEvaluatorFactory`s created as part of this scan
- let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> =
HashMap::new();
-
- // these variables needed to ensure that we don't need to pass a
- // reference to self into `try_stream`, as it expects references
- // passed in to outlive 'static
- let schema = self.schema.clone();
- let snapshot = self.snapshot.clone();
- let table_metadata = self.table_metadata.clone();
- let file_io = self.file_io.clone();
- let case_sensitive = self.case_sensitive;
- let filter = self.filter.clone();
+ /// Returns a stream of [`FileScanTask`]s.
+ pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+ let context = FileScanStreamContext::new(
+ self.schema.clone(),
+ self.snapshot.clone(),
+ self.table_metadata.clone(),
+ self.file_io.clone(),
+ self.filter.clone(),
+ self.case_sensitive,
+ )?;
+
+ let mut partition_filter_cache = PartitionFilterCache::new();
+ let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
Ok(try_stream! {
- let manifest_list = snapshot
- .clone()
- .load_manifest_list(&file_io, &table_metadata)
- .await?;
+ let manifest_list = context
+ .snapshot
+ .load_manifest_list(&context.file_io, &context.table_metadata)
+ .await?;
- // Generate data file stream
for entry in manifest_list.entries() {
- // If this scan has a filter, check the partition evaluator
cache for an existing
- // PartitionEvaluator that matches this manifest's partition
spec ID.
- // Use one from the cache if there is one. If not, create one,
put it in
- // the cache, and take a reference to it.
- #[allow(clippy::map_entry)]
- if let Some(filter) = filter.as_ref() {
- if
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-
manifest_evaluator_cache.insert(entry.partition_spec_id,
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(),
table_metadata.clone(), case_sensitive, filter)?);
- }
- let manifest_evaluator =
&manifest_evaluator_cache[&entry.partition_spec_id];
+ if !Self::content_type_is_data(entry) {
+ continue;
+ }
+
+ if let Some(filter) = context.bound_filter() {
+ let partition_spec_id = entry.partition_spec_id;
+
+ let (partition_spec, partition_schema) =
+
context.create_partition_spec_and_schema(partition_spec_id)?;
+
+ let partition_filter = partition_filter_cache.get(
+ partition_spec_id,
+ partition_spec,
+ partition_schema.clone(),
+ filter,
+ context.case_sensitive,
+ )?;
+
+ let manifest_evaluator = manifest_evaluator_cache.get(
+ partition_schema.schema_id(),
+ partition_filter.clone(),
+ context.case_sensitive,
+ );
- // reject any manifest files whose partition values don't
match the filter.
if !manifest_evaluator.eval(entry)? {
continue;
}
+
+ // TODO: Create ExpressionEvaluator
}
- let manifest = entry.load_manifest(&file_io).await?;
+ let manifest = entry.load_manifest(&context.file_io).await?;
+ let mut manifest_entries_stream =
+ futures::stream::iter(manifest.entries().iter().filter(|e|
e.is_alive()));
+
+ while let Some(manifest_entry) =
manifest_entries_stream.next().await {
+ // TODO: Apply ExpressionEvaluator
+ // TODO: Apply InclusiveMetricsEvaluator::eval()
- let mut manifest_entries =
iter(manifest.entries().iter().filter(|e| e.is_alive()));
- while let Some(manifest_entry) = manifest_entries.next().await
{
match manifest_entry.content_type() {
DataContentType::EqualityDeletes |
DataContentType::PositionDeletes => {
yield Err(Error::new(
@@ -226,7 +246,7 @@ impl TableScan {
))?;
}
DataContentType::Data => {
- let scan_task: crate::Result<FileScanTask> =
Ok(FileScanTask {
+ let scan_task: Result<FileScanTask> =
Ok(FileScanTask {
data_manifest_entry: manifest_entry.clone(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
@@ -240,29 +260,8 @@ impl TableScan {
.boxed())
}
- fn create_manifest_evaluator(
- id: i32,
- schema: SchemaRef,
- table_metadata: Arc<TableMetadata>,
- case_sensitive: bool,
- filter: &Predicate,
- ) -> crate::Result<ManifestEvaluator> {
- let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;
-
- let partition_spec =
table_metadata.partition_spec_by_id(id).ok_or(Error::new(
- ErrorKind::Unexpected,
- format!("Could not find partition spec for id {id}"),
- ))?;
-
- ManifestEvaluator::new(
- partition_spec.clone(),
- schema.clone(),
- bound_predicate,
- case_sensitive,
- )
- }
-
- pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
+ /// Returns an [`ArrowRecordBatchStream`].
+ pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
@@ -312,6 +311,147 @@ impl TableScan {
arrow_reader_builder.build().read(self.plan_files().await?)
}
+
+ /// Checks whether the [`ManifestContentType`] is `Data` or not.
+ fn content_type_is_data(entry: &ManifestFile) -> bool {
+ if let ManifestContentType::Data = entry.content {
+ return true;
+ }
+ false
+ }
+}
+
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+ schema: SchemaRef,
+ snapshot: SnapshotRef,
+ table_metadata: TableMetadataRef,
+ file_io: FileIO,
+ bound_filter: Option<BoundPredicate>,
+ case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+ /// Creates a new [`FileScanStreamContext`].
+ fn new(
+ schema: SchemaRef,
+ snapshot: SnapshotRef,
+ table_metadata: TableMetadataRef,
+ file_io: FileIO,
+ filter: Option<Arc<Predicate>>,
+ case_sensitive: bool,
+ ) -> Result<Self> {
+ let bound_filter = match filter {
+ Some(ref filter) => Some(filter.bind(schema.clone(),
case_sensitive)?),
+ None => None,
+ };
+
+ Ok(Self {
+ schema,
+ snapshot,
+ table_metadata,
+ file_io,
+ bound_filter,
+ case_sensitive,
+ })
+ }
+
+ /// Returns a reference to the [`BoundPredicate`] filter.
+ fn bound_filter(&self) -> Option<&BoundPredicate> {
+ self.bound_filter.as_ref()
+ }
+
+ /// Creates a reference-counted [`PartitionSpec`] and a
+ /// corresponding [`Schema`] based on the specified partition spec id.
+ fn create_partition_spec_and_schema(
+ &self,
+ spec_id: i32,
+ ) -> Result<(PartitionSpecRef, SchemaRef)> {
+ let partition_spec =
+ self.table_metadata
+ .partition_spec_by_id(spec_id)
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Could not find partition spec for id {}",
spec_id),
+ ))?;
+
+ let partition_type = partition_spec.partition_type(&self.schema)?;
+ let partition_fields = partition_type.fields().to_owned();
+ let partition_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(partition_spec.spec_id)
+ .with_fields(partition_fields)
+ .build()?,
+ );
+
+ Ok((partition_spec.clone(), partition_schema))
+ }
+}
+
+#[derive(Debug)]
+/// Manages the caching of [`BoundPredicate`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+struct PartitionFilterCache(HashMap<i32, BoundPredicate>);
+
+impl PartitionFilterCache {
+ /// Creates a new [`PartitionFilterCache`]
+ /// with an empty internal HashMap.
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+
+ /// Retrieves a [`BoundPredicate`] from the cache
+ /// or computes it if not present.
+ fn get(
+ &mut self,
+ spec_id: i32,
+ partition_spec: PartitionSpecRef,
+ partition_schema: SchemaRef,
+ filter: &BoundPredicate,
+ case_sensitive: bool,
+ ) -> Result<&BoundPredicate> {
+ match self.0.entry(spec_id) {
+ Entry::Occupied(e) => Ok(e.into_mut()),
+ Entry::Vacant(e) => {
+ let mut inclusive_projection =
InclusiveProjection::new(partition_spec);
+
+ let partition_filter = inclusive_projection
+ .project(filter)?
+ .rewrite_not()
+ .bind(partition_schema, case_sensitive)?;
+
+ Ok(e.insert(partition_filter))
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+/// Manages the caching of [`ManifestEvaluator`] objects
+/// for [`PartitionSpec`]s based on partition spec id.
+struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);
+
+impl ManifestEvaluatorCache {
+ /// Creates a new [`ManifestEvaluatorCache`]
+ /// with an empty internal HashMap.
+ fn new() -> Self {
+ Self(HashMap::new())
+ }
+
+ /// Retrieves a [`ManifestEvaluator`] from the cache
+ /// or computes it if not present.
+ fn get(
+ &mut self,
+ spec_id: i32,
+ partition_filter: BoundPredicate,
+ case_sensitive: bool,
+ ) -> &mut ManifestEvaluator {
+ self.0
+ .entry(spec_id)
+ .or_insert(ManifestEvaluator::new(partition_filter,
case_sensitive))
+ }
}
/// A task to scan part of file.
@@ -324,9 +464,6 @@ pub struct FileScanTask {
length: u64,
}
-/// A stream of arrow record batches.
-pub type ArrowRecordBatchStream = BoxStream<'static,
crate::Result<RecordBatch>>;
-
impl FileScanTask {
pub fn data(&self) -> ManifestEntryRef {
self.data_manifest_entry.clone()