alamb commented on code in PR #21190:
URL: https://github.com/apache/datafusion/pull/21190#discussion_r3002315834
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -125,15 +133,338 @@ pub(super) struct ParquetOpener {
pub reverse_row_groups: bool,
}
+/// States for [`ParquetOpenFuture`]
+///
+/// These states correspond to the steps required to read and apply various
+/// filter operations.
+///
+/// States whose names beginning with `Load` represent waiting on IO to resolve
+///
+/// ```text
+/// Start
+/// |
+/// v
+/// [LoadEncryption]?
+/// |
+/// v
+/// PruneFile
+/// |
+/// v
+/// LoadMetadata
+/// |
+/// v
+/// PrepareFilters
+/// |
+/// v
+/// LoadPageIndex
+/// |
+/// v
+/// PruneWithStatistics
+/// |
+/// v
+/// PruneWithBloomFilters
+/// |
+/// v
+/// BuildStream
+/// |
+/// v
+/// Done
+/// ```
+///
+/// Note: `LoadEncryption` is only present when the `parquet_encryption`
feature is
+/// enabled. All other states are always visited in the order shown above,
+/// though any async state may return `Poll::Pending` and then resume later.
+enum ParquetOpenState {
+ Start {
+ prepared: Box<PreparedParquetOpen>,
+ #[cfg(feature = "parquet_encryption")]
+ encryption_context: Arc<EncryptionContext>,
+ },
+ /// Loading encryption footers
+ #[cfg(feature = "parquet_encryption")]
+ LoadEncryption(BoxFuture<'static, Result<Box<PreparedParquetOpen>>>),
+ /// Try to prune file using only file-level statistics and partition
+ /// values before loading any parquet metadata
+ PruneFile(Box<PreparedParquetOpen>),
+ /// Loading Parquet metadata (in footer)
+ LoadMetadata(BoxFuture<'static, Result<MetadataLoadedParquetOpen>>),
+ /// Specialize any filters for the actual file schema (only known after
+ /// metadata is loaded)
+ PrepareFilters(Box<MetadataLoadedParquetOpen>),
+ /// Loading [Parquet Page
Index](https://parquet.apache.org/docs/file-format/pageindex/)
+ LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
+ /// Pruning Row Groups
+ PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
+ /// Pruning with Bloom Filters
+ ///
+ /// TODO: split state as this currently does both I/O and CPU work
+ PruneWithBloomFilters(BoxFuture<'static,
Result<RowGroupsPrunedParquetOpen>>),
+ /// Builds the final reader stream
+ ///
+ /// TODO: split state as this currently does both I/O and CPU work.
+ BuildStream(Box<RowGroupsPrunedParquetOpen>),
+ /// Terminal state: the final opened stream is ready to return.
+ Ready(BoxStream<'static, Result<RecordBatch>>),
+ /// Terminal state: reading complete
+ Done,
+}
+
+struct PreparedParquetOpen {
+ partition_index: usize,
+ partitioned_file: PartitionedFile,
+ file_range: Option<datafusion_datasource::FileRange>,
+ extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ file_name: String,
+ file_metrics: ParquetFileMetrics,
+ baseline_metrics: BaselineMetrics,
+ file_pruner: Option<FilePruner>,
+ metadata_size_hint: Option<usize>,
+ metrics: ExecutionPlanMetricsSet,
+ parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+ async_file_reader: Box<dyn AsyncFileReader>,
+ batch_size: usize,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ output_schema: SchemaRef,
+ projection: ProjectionExprs,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
+ reorder_predicates: bool,
+ pushdown_filters: bool,
+ force_filter_selections: bool,
+ enable_page_index: bool,
+ enable_bloom_filter: bool,
+ enable_row_group_stats_pruning: bool,
+ limit: Option<usize>,
+ coerce_int96: Option<TimeUnit>,
+ expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
+ predicate_creation_errors: Count,
+ max_predicate_cache_size: Option<usize>,
+ reverse_row_groups: bool,
+ preserve_order: bool,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Result of loading parquet metadata after file-level pruning is complete.
+struct MetadataLoadedParquetOpen {
+ prepared: PreparedParquetOpen,
+ reader_metadata: ArrowReaderMetadata,
+ options: ArrowReaderOptions,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Pruning Predicate and DataPage pruning information
+/// specialized for the files specific schema.
+struct FiltersPreparedParquetOpen {
+ loaded: MetadataLoadedParquetOpen,
+ pruning_predicate: Option<Arc<PruningPredicate>>,
+ page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
+}
+
+/// State of [`ParquetOpenState`]
+///
+/// Result of CPU-only row-group pruning before optional bloom-filter I/O.
+struct RowGroupsPrunedParquetOpen {
+ prepared: FiltersPreparedParquetOpen,
+ row_groups: RowGroupAccessPlanFilter,
+}
+
+/// Implements state machine described in [`ParquetOpenState`]
+struct ParquetOpenFuture {
+ state: ParquetOpenState,
+}
+
+impl ParquetOpenFuture {
+ #[cfg(feature = "parquet_encryption")]
+ fn new(prepared: PreparedParquetOpen, encryption_context:
EncryptionContext) -> Self {
+ Self {
+ state: ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ encryption_context: Arc::new(encryption_context),
+ },
+ }
+ }
+
+ #[cfg(not(feature = "parquet_encryption"))]
+ fn new(prepared: PreparedParquetOpen) -> Self {
+ Self {
+ state: ParquetOpenState::Start {
+ prepared: Box::new(prepared),
+ },
+ }
+ }
+}
+
+impl ParquetOpenState {
+ /// Applies one CPU-only state transition.
+ ///
+ /// `Load*` states do not transition here and are returned unchanged so the
+ /// driver loop can poll their inner futures separately.
+ fn transition(self) -> Result<ParquetOpenState> {
Review Comment:
here is the new state machine
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]