This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3f6298db59 Minor: Move streams out of `physical_plan` module (#7234)
3f6298db59 is described below
commit 3f6298db59a87084973feb3e9d452f118dd51bd9
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 08:59:09 2023 -0500
Minor: Move streams out of `physical_plan` module (#7234)
* Minor: Move streams out of main physical_plan module
* Update datafusion-cli Cargo
---
datafusion-cli/Cargo.lock | 72 +++++++++++++-------------
datafusion/core/src/physical_plan/mod.rs | 50 ++----------------
datafusion/core/src/physical_plan/stream.rs | 51 ++++++++++++++----
datafusion/execution/Cargo.toml | 2 +
datafusion/execution/src/lib.rs | 2 +
datafusion/execution/src/{lib.rs => stream.rs} | 25 +++++----
6 files changed, 101 insertions(+), 101 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 22c8e69bc9..769b3ac367 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -814,9 +814,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.0.80"
+version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "51f1226cd9da55587234753d1245dd5b132343ea240f26b6a9003d68706141ba"
+checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
dependencies = [
"jobserver",
"libc",
@@ -1134,9 +1134,11 @@ dependencies = [
name = "datafusion-execution"
version = "28.0.0"
dependencies = [
+ "arrow",
"dashmap",
"datafusion-common",
"datafusion-expr",
+ "futures",
"hashbrown 0.14.0",
"log",
"object_store",
@@ -1156,7 +1158,7 @@ dependencies = [
"lazy_static",
"sqlparser",
"strum 0.25.0",
- "strum_macros 0.25.1",
+ "strum_macros 0.25.2",
]
[[package]]
@@ -1222,9 +1224,9 @@ dependencies = [
[[package]]
name = "deranged"
-version = "0.3.6"
+version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01"
+checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929"
[[package]]
name = "difflib"
@@ -1735,7 +1737,7 @@ dependencies = [
"futures-util",
"http",
"hyper",
- "rustls 0.21.5",
+ "rustls 0.21.6",
"tokio",
"tokio-rustls 0.24.1",
]
@@ -2365,18 +2367,18 @@ dependencies = [
[[package]]
name = "pin-project"
-version = "1.1.2"
+version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842"
+checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
-version = "1.1.2"
+version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
+checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
@@ -2385,9 +2387,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
-version = "0.2.10"
+version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
+checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c"
[[package]]
name = "pin-utils"
@@ -2567,9 +2569,9 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.9.1"
+version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
+checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
dependencies = [
"aho-corasick",
"memchr",
@@ -2579,9 +2581,9 @@ dependencies = [
[[package]]
name = "regex-automata"
-version = "0.3.4"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294"
+checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
dependencies = [
"aho-corasick",
"memchr",
@@ -2617,7 +2619,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
- "rustls 0.21.5",
+ "rustls 0.21.6",
"rustls-pemfile",
"serde",
"serde_json",
@@ -2693,9 +2695,9 @@ dependencies = [
[[package]]
name = "rustix"
-version = "0.38.4"
+version = "0.38.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5"
+checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399"
dependencies = [
"bitflags 2.3.3",
"errno",
@@ -2718,9 +2720,9 @@ dependencies = [
[[package]]
name = "rustls"
-version = "0.21.5"
+version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
+checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [
"log",
"ring",
@@ -2751,9 +2753,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
-version = "0.101.2"
+version = "0.101.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59"
+checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0"
dependencies = [
"ring",
"untrusted",
@@ -2865,18 +2867,18 @@ checksum =
"a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
-version = "1.0.180"
+version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed"
+checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.180"
+version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036"
+checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
dependencies = [
"proc-macro2",
"quote",
@@ -3033,7 +3035,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
- "strum_macros 0.25.1",
+ "strum_macros 0.25.2",
]
[[package]]
@@ -3051,9 +3053,9 @@ dependencies = [
[[package]]
name = "strum_macros"
-version = "0.25.1"
+version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232"
+checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059"
dependencies = [
"heck",
"proc-macro2",
@@ -3092,9 +3094,9 @@ dependencies = [
[[package]]
name = "tempfile"
-version = "3.7.0"
+version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998"
+checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651"
dependencies = [
"cfg-if",
"fastrand 2.0.0",
@@ -3157,9 +3159,9 @@ dependencies = [
[[package]]
name = "time"
-version = "0.3.24"
+version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b"
+checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea"
dependencies = [
"deranged",
"serde",
@@ -3253,7 +3255,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
- "rustls 0.21.5",
+ "rustls 0.21.6",
"tokio",
]
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 2eaf96b0f2..903962f213 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -37,59 +37,19 @@ pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType,
VerboseDisplay};
-use futures::stream::{Stream, TryStreamExt};
+use futures::stream::TryStreamExt;
use std::fmt;
use std::fmt::Debug;
use tokio::task::JoinSet;
use datafusion_common::tree_node::Transformed;
use datafusion_common::DataFusionError;
+use std::any::Any;
use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::{any::Any, pin::Pin};
-/// Trait for types that stream [arrow::record_batch::RecordBatch]
-pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
- /// Returns the schema of this `RecordBatchStream`.
- ///
- /// Implementation of this trait should guarantee that all `RecordBatch`'s
returned by this
- /// stream should have the same schema as returned from this method.
- fn schema(&self) -> SchemaRef;
-}
-
-/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
-pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
-
-/// EmptyRecordBatchStream can be used to create a RecordBatchStream
-/// that will produce no results
-pub struct EmptyRecordBatchStream {
- /// Schema wrapped by Arc
- schema: SchemaRef,
-}
-
-impl EmptyRecordBatchStream {
- /// Create an empty RecordBatchStream
- pub fn new(schema: SchemaRef) -> Self {
- Self { schema }
- }
-}
-
-impl RecordBatchStream for EmptyRecordBatchStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-impl Stream for EmptyRecordBatchStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- Poll::Ready(None)
- }
-}
+// backwards compatibility
+pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+pub use stream::EmptyRecordBatchStream;
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
diff --git a/datafusion/core/src/physical_plan/stream.rs
b/datafusion/core/src/physical_plan/stream.rs
index bdc2050b24..2b916b7ee2 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -17,7 +17,10 @@
//! Stream wrappers for physical operators
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
@@ -231,9 +234,9 @@ impl Stream for RecordBatchReceiverStream {
type Item = Result<RecordBatch>;
fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
@@ -276,10 +279,7 @@ where
{
type Item = Result<RecordBatch>;
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
@@ -297,6 +297,37 @@ where
}
}
+/// EmptyRecordBatchStream can be used to create a RecordBatchStream
+/// that will produce no results
+pub struct EmptyRecordBatchStream {
+ /// Schema wrapped by Arc
+ schema: SchemaRef,
+}
+
+impl EmptyRecordBatchStream {
+ /// Create an empty RecordBatchStream
+ pub fn new(schema: SchemaRef) -> Self {
+ Self { schema }
+ }
+}
+
+impl RecordBatchStream for EmptyRecordBatchStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Stream for EmptyRecordBatchStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ Poll::Ready(None)
+ }
+}
+
/// Stream wrapper that records `BaselineMetrics` for a particular
/// `[SendableRecordBatchStream]` (likely a partition)
pub(crate) struct ObservedStream {
@@ -326,9 +357,9 @@ impl futures::Stream for ObservedStream {
type Item = Result<RecordBatch>;
fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
self.baseline_metrics.record_poll(poll)
}
diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml
index a9c8d0027b..b87e73cdf2 100644
--- a/datafusion/execution/Cargo.toml
+++ b/datafusion/execution/Cargo.toml
@@ -33,9 +33,11 @@ name = "datafusion_execution"
path = "src/lib.rs"
[dependencies]
+arrow = { workspace = true }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "28.0.0" }
datafusion-expr = { path = "../expr", version = "28.0.0" }
+futures = "0.3"
hashbrown = { version = "0.14", features = ["raw"] }
log = "^0.4"
object_store = "0.6.1"
diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs
index 46ffe12942..57d77aa1dd 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/lib.rs
@@ -23,8 +23,10 @@ pub mod memory_pool;
pub mod object_store;
pub mod registry;
pub mod runtime_env;
+mod stream;
mod task;
pub use disk_manager::DiskManager;
pub use registry::FunctionRegistry;
+pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use task::TaskContext;
diff --git a/datafusion/execution/src/lib.rs
b/datafusion/execution/src/stream.rs
similarity index 53%
copy from datafusion/execution/src/lib.rs
copy to datafusion/execution/src/stream.rs
index 46ffe12942..5a1a9aaa25 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/stream.rs
@@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-//! DataFusion execution configuration and runtime structures
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::Result;
+use futures::Stream;
+use std::pin::Pin;
-pub mod config;
-pub mod disk_manager;
-pub mod memory_pool;
-pub mod object_store;
-pub mod registry;
-pub mod runtime_env;
-mod task;
+/// Trait for types that stream [arrow::record_batch::RecordBatch]
+pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
+ /// Returns the schema of this `RecordBatchStream`.
+ ///
+ /// Implementation of this trait should guarantee that all `RecordBatch`'s
returned by this
+ /// stream should have the same schema as returned from this method.
+ fn schema(&self) -> SchemaRef;
+}
-pub use disk_manager::DiskManager;
-pub use registry::FunctionRegistry;
-pub use task::TaskContext;
+/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
+pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;