alamb commented on code in PR #21190:
URL: https://github.com/apache/datafusion/pull/21190#discussion_r3002321194


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -125,15 +133,338 @@ pub(super) struct ParquetOpener {
     pub reverse_row_groups: bool,
 }
 
+/// States for [`ParquetOpenFuture`]
+///
+/// These states correspond to the steps required to read and apply various
+/// filter operations.
+///
+/// States whose names beginning with `Load` represent waiting on IO to resolve
+///
+/// ```text
+///      Start
+///        |
+///        v
+/// [LoadEncryption]?
+///        |
+///        v
+///    PruneFile
+///        |
+///        v
+///   LoadMetadata
+///        |
+///        v
+///  PrepareFilters
+///        |
+///        v
+///   LoadPageIndex
+///        |
+///        v
+/// PruneWithStatistics
+///        |
+///        v
+/// PruneWithBloomFilters
+///        |
+///        v
+///   BuildStream
+///        |
+///        v
+///       Done
+/// ```
+///
+/// Note: `LoadEncryption` is only present when the `parquet_encryption` 
feature is
+/// enabled. All other states are always visited in the order shown above,
+/// though any async state may return `Poll::Pending` and then resume later.
+enum ParquetOpenState {
+    Start {
+        prepared: Box<PreparedParquetOpen>,
+        #[cfg(feature = "parquet_encryption")]
+        encryption_context: Arc<EncryptionContext>,
+    },
+    /// Loading encryption footers
+    #[cfg(feature = "parquet_encryption")]
+    LoadEncryption(BoxFuture<'static, Result<Box<PreparedParquetOpen>>>),
+    /// Try to prune file using only file-level statistics and partition
+    /// values before loading any parquet metadata
+    PruneFile(Box<PreparedParquetOpen>),
+    /// Loading Parquet metadata (in footer)
+    LoadMetadata(BoxFuture<'static, Result<MetadataLoadedParquetOpen>>),
+    /// Specialize any filters for the actual file schema (only known after
+    /// metadata is loaded)
+    PrepareFilters(Box<MetadataLoadedParquetOpen>),
+    /// Loading [Parquet Page 
Index](https://parquet.apache.org/docs/file-format/pageindex/)
+    LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
+    /// Pruning Row Groups
+    PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
+    /// Pruning with Bloom Filters
+    ///
+    /// TODO: split state as this currently does both I/O and CPU work

Review Comment:
   I am trying to minimize the diff, so I want to break out the CPU and IO in a 
follow on PR



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -125,15 +133,338 @@ pub(super) struct ParquetOpener {
     pub reverse_row_groups: bool,
 }
 
+/// States for [`ParquetOpenFuture`]
+///
+/// These states correspond to the steps required to read and apply various
+/// filter operations.
+///
+/// States whose names beginning with `Load` represent waiting on IO to resolve
+///
+/// ```text
+///      Start
+///        |
+///        v
+/// [LoadEncryption]?
+///        |
+///        v
+///    PruneFile
+///        |
+///        v
+///   LoadMetadata
+///        |
+///        v
+///  PrepareFilters
+///        |
+///        v
+///   LoadPageIndex
+///        |
+///        v
+/// PruneWithStatistics
+///        |
+///        v
+/// PruneWithBloomFilters
+///        |
+///        v
+///   BuildStream
+///        |
+///        v
+///       Done
+/// ```
+///
+/// Note: `LoadEncryption` is only present when the `parquet_encryption` 
feature is
+/// enabled. All other states are always visited in the order shown above,
+/// though any async state may return `Poll::Pending` and then resume later.
+enum ParquetOpenState {
+    Start {
+        prepared: Box<PreparedParquetOpen>,
+        #[cfg(feature = "parquet_encryption")]
+        encryption_context: Arc<EncryptionContext>,
+    },
+    /// Loading encryption footers
+    #[cfg(feature = "parquet_encryption")]
+    LoadEncryption(BoxFuture<'static, Result<Box<PreparedParquetOpen>>>),
+    /// Try to prune file using only file-level statistics and partition
+    /// values before loading any parquet metadata
+    PruneFile(Box<PreparedParquetOpen>),
+    /// Loading Parquet metadata (in footer)
+    LoadMetadata(BoxFuture<'static, Result<MetadataLoadedParquetOpen>>),
+    /// Specialize any filters for the actual file schema (only known after
+    /// metadata is loaded)
+    PrepareFilters(Box<MetadataLoadedParquetOpen>),
+    /// Loading [Parquet Page 
Index](https://parquet.apache.org/docs/file-format/pageindex/)
+    LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
+    /// Pruning Row Groups
+    PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
+    /// Pruning with Bloom Filters
+    ///
+    /// TODO: split state as this currently does both I/O and CPU work
+    PruneWithBloomFilters(BoxFuture<'static, 
Result<RowGroupsPrunedParquetOpen>>),
+    /// Builds the final reader stream
+    ///
+    /// TODO: split state as this currently does both I/O and CPU work.
+    BuildStream(Box<RowGroupsPrunedParquetOpen>),
+    /// Terminal state: the final opened stream is ready to return.
+    Ready(BoxStream<'static, Result<RecordBatch>>),
+    /// Terminal state: reading complete
+    Done,
+}
+
+struct PreparedParquetOpen {
+    partition_index: usize,
+    partitioned_file: PartitionedFile,
+    file_range: Option<datafusion_datasource::FileRange>,
+    extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+    file_name: String,
+    file_metrics: ParquetFileMetrics,
+    baseline_metrics: BaselineMetrics,
+    file_pruner: Option<FilePruner>,
+    metadata_size_hint: Option<usize>,
+    metrics: ExecutionPlanMetricsSet,
+    parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    async_file_reader: Box<dyn AsyncFileReader>,
+    batch_size: usize,
+    logical_file_schema: SchemaRef,
+    physical_file_schema: SchemaRef,
+    output_schema: SchemaRef,
+    projection: ProjectionExprs,
+    predicate: Option<Arc<dyn PhysicalExpr>>,
+    reorder_predicates: bool,
+    pushdown_filters: bool,
+    force_filter_selections: bool,
+    enable_page_index: bool,
+    enable_bloom_filter: bool,
+    enable_row_group_stats_pruning: bool,
+    limit: Option<usize>,
+    coerce_int96: Option<TimeUnit>,
+    expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
+    predicate_creation_errors: Count,
+    max_predicate_cache_size: Option<usize>,
+    reverse_row_groups: bool,
+    preserve_order: bool,
+    #[cfg(feature = "parquet_encryption")]
+    file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Result of loading parquet metadata after file-level pruning is complete.
+struct MetadataLoadedParquetOpen {
+    prepared: PreparedParquetOpen,
+    reader_metadata: ArrowReaderMetadata,
+    options: ArrowReaderOptions,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Pruning Predicate and DataPage pruning information
+/// specialized for the files specific schema.
+struct FiltersPreparedParquetOpen {
+    loaded: MetadataLoadedParquetOpen,
+    pruning_predicate: Option<Arc<PruningPredicate>>,
+    page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Result of CPU-only row-group pruning before optional bloom-filter I/O.
+struct RowGroupsPrunedParquetOpen {
+    prepared: FiltersPreparedParquetOpen,
+    row_groups: RowGroupAccessPlanFilter,
+}
+
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+    state: ParquetOpenState,
+}
+
+impl ParquetOpenFuture {
+    #[cfg(feature = "parquet_encryption")]
+    fn new(prepared: PreparedParquetOpen, encryption_context: 
EncryptionContext) -> Self {
+        Self {
+            state: ParquetOpenState::Start {
+                prepared: Box::new(prepared),
+                encryption_context: Arc::new(encryption_context),
+            },
+        }
+    }
+
+    #[cfg(not(feature = "parquet_encryption"))]
+    fn new(prepared: PreparedParquetOpen) -> Self {
+        Self {
+            state: ParquetOpenState::Start {
+                prepared: Box::new(prepared),
+            },
+        }
+    }
+}
+
+impl ParquetOpenState {
+    /// Applies one CPU-only state transition.
+    ///
+    /// `Load*` states do not transition here and are returned unchanged so the
+    /// driver loop can poll their inner futures separately.
+    fn transition(self) -> Result<ParquetOpenState> {
+        match self {
+            ParquetOpenState::Start {
+                prepared,
+                #[cfg(feature = "parquet_encryption")]
+                encryption_context,
+            } => {
+                #[cfg(feature = "parquet_encryption")]
+                {
+                    let mut prepared = *prepared;
+                    let future = async move {
+                        let file_location =
+                            &prepared.partitioned_file.object_meta.location;
+                        prepared.file_decryption_properties = 
encryption_context
+                            .get_file_decryption_properties(file_location)
+                            .await?;
+                        Ok(Box::new(prepared))
+                    }
+                    .boxed();
+                    Ok(ParquetOpenState::LoadEncryption(future))
+                }
+                #[cfg(not(feature = "parquet_encryption"))]
+                {
+                    Ok(ParquetOpenState::PruneFile(prepared))
+                }
+            }
+            #[cfg(feature = "parquet_encryption")]
+            ParquetOpenState::LoadEncryption(future) => {
+                Ok(ParquetOpenState::LoadEncryption(future))
+            }
+            ParquetOpenState::PruneFile(prepared) => {
+                let Some(prepared) = (*prepared).prune_file()? else {
+                    return Ok(ParquetOpenState::Done);
+                };
+                Ok(ParquetOpenState::LoadMetadata(prepared.load().boxed()))
+            }
+            ParquetOpenState::LoadMetadata(future) => {
+                Ok(ParquetOpenState::LoadMetadata(future))
+            }
+            ParquetOpenState::PrepareFilters(loaded) => {
+                let prepared_filters = loaded.prepare_filters()?;
+                Ok(ParquetOpenState::LoadPageIndex(
+                    prepared_filters.load_page_index().boxed(),
+                ))
+            }
+            ParquetOpenState::LoadPageIndex(future) => {
+                Ok(ParquetOpenState::LoadPageIndex(future))
+            }
+            ParquetOpenState::PruneWithStatistics(prepared) => {
+                let prepared_row_groups = prepared.prune_row_groups()?;
+                Ok(ParquetOpenState::PruneWithBloomFilters(
+                    prepared_row_groups.prune_bloom_filters().boxed(),
+                ))
+            }
+            ParquetOpenState::PruneWithBloomFilters(future) => {
+                Ok(ParquetOpenState::PruneWithBloomFilters(future))
+            }
+            ParquetOpenState::BuildStream(prepared) => {
+                Ok(ParquetOpenState::Ready(prepared.build_stream()?))
+            }
+            ParquetOpenState::Ready(stream) => 
Ok(ParquetOpenState::Ready(stream)),
+            ParquetOpenState::Done => {
+                panic!("ParquetOpenFuture polled after completion");
+            }
+        }
+    }
+}
+
+impl Future for ParquetOpenFuture {
+    type Output = Result<BoxStream<'static, Result<RecordBatch>>>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        loop {
+            let state = mem::replace(&mut self.state, ParquetOpenState::Done);
+            let mut state = state.transition()?;
+
+            match state {

Review Comment:
   here is the code that adapts the state machine into a future (and handles 
mapping to a future)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to