adriangb commented on code in PR #21327:
URL: https://github.com/apache/datafusion/pull/21327#discussion_r3051778184


##########
datafusion/datasource/src/morsel/mod.rs:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Structures for Morsel Driven IO.
+//!
+//! Morsel Driven IO is a technique for parallelizing the reading of large 
files
+//! by dividing them into smaller "morsels" that are processed independently.
+//!
+//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
+//! Evaluation Framework for the Many-Core 
Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
+
+use crate::PartitionedFile;
+use arrow::array::RecordBatch;
+use datafusion_common::Result;
+use futures::FutureExt;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
+///
+/// This represents a single morsel of work that is ready to be processed. It
+/// has all data necessary (does not need any I/O) and is ready to be turned
+/// into a stream of [`RecordBatch`]es for processing by the execution engine.
+pub trait Morsel: Send + Debug {
+    /// Consume this morsel and produce a stream of [`RecordBatch`]es for 
processing.
+    ///
+    /// Note: This may do CPU work to decode already-loaded data, but should 
not
+    /// do any I/O work such as reading from the file.
+    fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
+}
+
+/// A Morselizer takes a single [`PartitionedFile`] and creates the initial 
planner
+/// for that file.
+///
+/// This is the entry point for morsel driven I/O.
+pub trait Morselizer: Send + Sync + Debug {
+    /// Return the initial [`MorselPlanner`] for this file.
+    ///
+    /// Morselizing a file may involve CPU work, such as parsing parquet
+    /// metadata and evaluating pruning predicates. It should NOT do any I/O
+    /// work, such as reading from the file. Any needed I/O should be done 
using
+    /// [`MorselPlan::with_pending_planner`].
+    fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn 
MorselPlanner>>;
+}
+
+/// A Morsel Planner is responsible for creating morsels for a given scan.
+///
+/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O
+/// outstanding for a specific planner. DataFusion may run
+/// multiple planners in parallel, which corresponds to multiple parallel
+/// I/O requests.
+///
+/// It is not a Rust `Stream` so that it can explicitly separate CPU bound
+/// work from I/O work.
+///
+/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it
+/// should do CPU work to produce the next morsels or discover the next I/O
+/// phase.
+///
+/// Best practice is to spawn I/O in a Tokio task on a separate runtime to
+/// ensure that CPU work doesn't block or slow down I/O work, but this is not
+/// strictly required by the API.
+pub trait MorselPlanner: Send + Debug {
+    /// Attempt to plan morsels. This may involve CPU work, such as parsing
+    /// parquet metadata and evaluating pruning predicates.
+    ///
+    /// It should NOT do any I/O work, such as reading from the file. If I/O is
+    /// required, the returned [`MorselPlan`] should contain a pending planner
+    /// future that the caller polls to drive the I/O work to completion. Once
+    /// that future resolves, it yields a planner ready for work.
+    ///
+    /// Note this function is **not async** to make it explicitly clear that if
+    /// I/O is required, it should be done in the returned `io_future`.
+    ///
+    /// Returns `None` if the planner has no more work to do.
+    ///
+    /// # Empty Morsel Plans
+    ///
+    /// It may return `None`, which means no batches will be read from the file
+    /// (e.g. due to late-pruning based on statistics).
+    ///
+    /// # Output Ordering
+    ///
+    /// See the comments on [`MorselPlan`] for the logical output order.
+    fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;

Review Comment:
   Nice



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -454,32 +453,46 @@ impl Future for ParquetOpenFuture {
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
         loop {
-            // If waiting on IO, poll
+            // If planner I/O completed, resume with the returned planner.
             if let Some(io_future) = self.pending_io.as_mut() {
-                ready!(io_future.poll_unpin(cx))?;
+                let maybe_planner = ready!(io_future.poll_unpin(cx));
+                // Clear `pending_io` before handling the result so an error
+                // cannot leave both continuation paths populated.
                 self.pending_io = None;
+                self.planner = Some(maybe_planner?);

Review Comment:
   Is there a bad state here if we already had a planner? If that's not 
possible should we assert the invariant (`assert!(self.planner.is_none())`)?



##########
datafusion/datasource/src/mod.rs:
##########
@@ -38,6 +38,7 @@ pub mod file_scan_config;
 pub mod file_sink_config;
 pub mod file_stream;
 pub mod memory;
+pub mod morsel;

Review Comment:
   I wonder if a comment here along the lines of `// Experimental new APIs, not 
ready for public usage yet` would be good here? We'll hopefully finish this 
work before next release but...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to