This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f6298db59 Minor: Move streams out of `physical_plan` module (#7234)
3f6298db59 is described below

commit 3f6298db59a87084973feb3e9d452f118dd51bd9
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 08:59:09 2023 -0500

    Minor: Move streams out of `physical_plan` module (#7234)
    
    * Minor: Move streams out of main physical_plan module
    
    * Update datafusion-cli Cargo
---
 datafusion-cli/Cargo.lock                      | 72 +++++++++++++-------------
 datafusion/core/src/physical_plan/mod.rs       | 50 ++----------------
 datafusion/core/src/physical_plan/stream.rs    | 51 ++++++++++++++----
 datafusion/execution/Cargo.toml                |  2 +
 datafusion/execution/src/lib.rs                |  2 +
 datafusion/execution/src/{lib.rs => stream.rs} | 25 +++++----
 6 files changed, 101 insertions(+), 101 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 22c8e69bc9..769b3ac367 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -814,9 +814,9 @@ dependencies = [
 
 [[package]]
 name = "cc"
-version = "1.0.80"
+version = "1.0.82"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "51f1226cd9da55587234753d1245dd5b132343ea240f26b6a9003d68706141ba"
+checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
 dependencies = [
  "jobserver",
  "libc",
@@ -1134,9 +1134,11 @@ dependencies = [
 name = "datafusion-execution"
 version = "28.0.0"
 dependencies = [
+ "arrow",
  "dashmap",
  "datafusion-common",
  "datafusion-expr",
+ "futures",
  "hashbrown 0.14.0",
  "log",
  "object_store",
@@ -1156,7 +1158,7 @@ dependencies = [
  "lazy_static",
  "sqlparser",
  "strum 0.25.0",
- "strum_macros 0.25.1",
+ "strum_macros 0.25.2",
 ]
 
 [[package]]
@@ -1222,9 +1224,9 @@ dependencies = [
 
 [[package]]
 name = "deranged"
-version = "0.3.6"
+version = "0.3.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01"
+checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929"
 
 [[package]]
 name = "difflib"
@@ -1735,7 +1737,7 @@ dependencies = [
  "futures-util",
  "http",
  "hyper",
- "rustls 0.21.5",
+ "rustls 0.21.6",
  "tokio",
  "tokio-rustls 0.24.1",
 ]
@@ -2365,18 +2367,18 @@ dependencies = [
 
 [[package]]
 name = "pin-project"
-version = "1.1.2"
+version = "1.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842"
+checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
 dependencies = [
  "pin-project-internal",
 ]
 
 [[package]]
 name = "pin-project-internal"
-version = "1.1.2"
+version = "1.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
+checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2385,9 +2387,9 @@ dependencies = [
 
 [[package]]
 name = "pin-project-lite"
-version = "0.2.10"
+version = "0.2.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
+checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c"
 
 [[package]]
 name = "pin-utils"
@@ -2567,9 +2569,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.9.1"
+version = "1.9.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
+checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -2579,9 +2581,9 @@ dependencies = [
 
 [[package]]
 name = "regex-automata"
-version = "0.3.4"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294"
+checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -2617,7 +2619,7 @@ dependencies = [
  "once_cell",
  "percent-encoding",
  "pin-project-lite",
- "rustls 0.21.5",
+ "rustls 0.21.6",
  "rustls-pemfile",
  "serde",
  "serde_json",
@@ -2693,9 +2695,9 @@ dependencies = [
 
 [[package]]
 name = "rustix"
-version = "0.38.4"
+version = "0.38.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5"
+checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399"
 dependencies = [
  "bitflags 2.3.3",
  "errno",
@@ -2718,9 +2720,9 @@ dependencies = [
 
 [[package]]
 name = "rustls"
-version = "0.21.5"
+version = "0.21.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
+checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
 dependencies = [
  "log",
  "ring",
@@ -2751,9 +2753,9 @@ dependencies = [
 
 [[package]]
 name = "rustls-webpki"
-version = "0.101.2"
+version = "0.101.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59"
+checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0"
 dependencies = [
  "ring",
  "untrusted",
@@ -2865,18 +2867,18 @@ checksum = 
"a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
 
 [[package]]
 name = "serde"
-version = "1.0.180"
+version = "1.0.183"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed"
+checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.180"
+version = "1.0.183"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036"
+checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3033,7 +3035,7 @@ version = "0.25.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
 dependencies = [
- "strum_macros 0.25.1",
+ "strum_macros 0.25.2",
 ]
 
 [[package]]
@@ -3051,9 +3053,9 @@ dependencies = [
 
 [[package]]
 name = "strum_macros"
-version = "0.25.1"
+version = "0.25.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232"
+checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059"
 dependencies = [
  "heck",
  "proc-macro2",
@@ -3092,9 +3094,9 @@ dependencies = [
 
 [[package]]
 name = "tempfile"
-version = "3.7.0"
+version = "3.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998"
+checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651"
 dependencies = [
  "cfg-if",
  "fastrand 2.0.0",
@@ -3157,9 +3159,9 @@ dependencies = [
 
 [[package]]
 name = "time"
-version = "0.3.24"
+version = "0.3.25"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b"
+checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea"
 dependencies = [
  "deranged",
  "serde",
@@ -3253,7 +3255,7 @@ version = "0.24.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
 dependencies = [
- "rustls 0.21.5",
+ "rustls 0.21.6",
  "tokio",
 ]
 
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 2eaf96b0f2..903962f213 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -37,59 +37,19 @@ pub use datafusion_expr::Accumulator;
 pub use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
 pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, 
VerboseDisplay};
-use futures::stream::{Stream, TryStreamExt};
+use futures::stream::TryStreamExt;
 use std::fmt;
 use std::fmt::Debug;
 use tokio::task::JoinSet;
 
 use datafusion_common::tree_node::Transformed;
 use datafusion_common::DataFusionError;
+use std::any::Any;
 use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::{any::Any, pin::Pin};
 
-/// Trait for types that stream [arrow::record_batch::RecordBatch]
-pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
-    /// Returns the schema of this `RecordBatchStream`.
-    ///
-    /// Implementation of this trait should guarantee that all `RecordBatch`'s 
returned by this
-    /// stream should have the same schema as returned from this method.
-    fn schema(&self) -> SchemaRef;
-}
-
-/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
-pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
-
-/// EmptyRecordBatchStream can be used to create a RecordBatchStream
-/// that will produce no results
-pub struct EmptyRecordBatchStream {
-    /// Schema wrapped by Arc
-    schema: SchemaRef,
-}
-
-impl EmptyRecordBatchStream {
-    /// Create an empty RecordBatchStream
-    pub fn new(schema: SchemaRef) -> Self {
-        Self { schema }
-    }
-}
-
-impl RecordBatchStream for EmptyRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for EmptyRecordBatchStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        Poll::Ready(None)
-    }
-}
+// backwards compatibility
+pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+pub use stream::EmptyRecordBatchStream;
 
 /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
 ///
diff --git a/datafusion/core/src/physical_plan/stream.rs 
b/datafusion/core/src/physical_plan/stream.rs
index bdc2050b24..2b916b7ee2 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -17,7 +17,10 @@
 
 //! Stream wrappers for physical operators
 
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
 
 use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
@@ -231,9 +234,9 @@ impl Stream for RecordBatchReceiverStream {
     type Item = Result<RecordBatch>;
 
     fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         self.inner.poll_next_unpin(cx)
     }
 }
@@ -276,10 +279,7 @@ where
 {
     type Item = Result<RecordBatch>;
 
-    fn poll_next(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         self.project().stream.poll_next(cx)
     }
 
@@ -297,6 +297,37 @@ where
     }
 }
 
+/// EmptyRecordBatchStream can be used to create a RecordBatchStream
+/// that will produce no results
+pub struct EmptyRecordBatchStream {
+    /// Schema wrapped by Arc
+    schema: SchemaRef,
+}
+
+impl EmptyRecordBatchStream {
+    /// Create an empty RecordBatchStream
+    pub fn new(schema: SchemaRef) -> Self {
+        Self { schema }
+    }
+}
+
+impl RecordBatchStream for EmptyRecordBatchStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Stream for EmptyRecordBatchStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        Poll::Ready(None)
+    }
+}
+
 /// Stream wrapper that records `BaselineMetrics` for a particular
 /// `[SendableRecordBatchStream]` (likely a partition)
 pub(crate) struct ObservedStream {
@@ -326,9 +357,9 @@ impl futures::Stream for ObservedStream {
     type Item = Result<RecordBatch>;
 
     fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         let poll = self.inner.poll_next_unpin(cx);
         self.baseline_metrics.record_poll(poll)
     }
diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml
index a9c8d0027b..b87e73cdf2 100644
--- a/datafusion/execution/Cargo.toml
+++ b/datafusion/execution/Cargo.toml
@@ -33,9 +33,11 @@ name = "datafusion_execution"
 path = "src/lib.rs"
 
 [dependencies]
+arrow = { workspace = true }
 dashmap = "5.4.0"
 datafusion-common = { path = "../common", version = "28.0.0" }
 datafusion-expr = { path = "../expr", version = "28.0.0" }
+futures = "0.3"
 hashbrown = { version = "0.14", features = ["raw"] }
 log = "^0.4"
 object_store = "0.6.1"
diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs
index 46ffe12942..57d77aa1dd 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/lib.rs
@@ -23,8 +23,10 @@ pub mod memory_pool;
 pub mod object_store;
 pub mod registry;
 pub mod runtime_env;
+mod stream;
 mod task;
 
 pub use disk_manager::DiskManager;
 pub use registry::FunctionRegistry;
+pub use stream::{RecordBatchStream, SendableRecordBatchStream};
 pub use task::TaskContext;
diff --git a/datafusion/execution/src/lib.rs 
b/datafusion/execution/src/stream.rs
similarity index 53%
copy from datafusion/execution/src/lib.rs
copy to datafusion/execution/src/stream.rs
index 46ffe12942..5a1a9aaa25 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/stream.rs
@@ -15,16 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! DataFusion execution configuration and runtime structures
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::Result;
+use futures::Stream;
+use std::pin::Pin;
 
-pub mod config;
-pub mod disk_manager;
-pub mod memory_pool;
-pub mod object_store;
-pub mod registry;
-pub mod runtime_env;
-mod task;
+/// Trait for types that stream [arrow::record_batch::RecordBatch]
+pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
+    /// Returns the schema of this `RecordBatchStream`.
+    ///
+    /// Implementation of this trait should guarantee that all `RecordBatch`'s 
returned by this
+    /// stream should have the same schema as returned from this method.
+    fn schema(&self) -> SchemaRef;
+}
 
-pub use disk_manager::DiskManager;
-pub use registry::FunctionRegistry;
-pub use task::TaskContext;
+/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es
+pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

Reply via email to