This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fca1df945e Add `StreamProvider` for configuring `StreamTable` (#10600)
fca1df945e is described below

commit fca1df945ec3a4b51366ecc4f3dd758e03530414
Author: Matthew Turner <[email protected]>
AuthorDate: Thu Jun 6 08:32:19 2024 -0400

    Add `StreamProvider` for configuring `StreamTable` (#10600)
    
    * Start setting up new StreamTable config
    
    * Cleanup
    
    * Cleanup
    
    * Fix some tests
    
    * Cleanup
    
    * Start adding example
    
    * Feedback
---
 datafusion-examples/Cargo.toml                     |   3 +
 datafusion-examples/README.md                      |   1 +
 .../examples/file_stream_provider.rs               | 186 +++++++++++++++++++++
 datafusion/core/src/datasource/stream.rs           | 140 ++++++++++++----
 .../core/src/physical_optimizer/test_utils.rs      |   5 +-
 datafusion/core/src/test_util/mod.rs               |   6 +-
 datafusion/core/tests/fifo/mod.rs                  |   6 +-
 datafusion/core/tests/sql/joins.rs                 |  14 +-
 8 files changed, 311 insertions(+), 50 deletions(-)

diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 0074a2b8d4..0bcf7c1afc 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", 
"parking_lot"] }
 tonic = "0.11"
 url = { workspace = true }
 uuid = "1.7"
+
+[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
+nix = { version = "0.28.0", features = ["fs"] }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index a5395ea7aa..c34f706adb 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -56,6 +56,7 @@ cargo run --example csv_sql
 - [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods 
which write data out from a DataFrame
 - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert 
query results into rust structs using serde
 - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze 
`Expr`s
+- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query 
on `FileStreamProvider` which implements `StreamProvider` for reading and 
writing to arbitrary stream sources / sinks.
 - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run 
DataFusion as a standalone process and execute SQL queries from JDBC clients
 - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE 
FUNCTION` handler to implement SQL macros
 - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date 
function
diff --git a/datafusion-examples/examples/file_stream_provider.rs 
b/datafusion-examples/examples/file_stream_provider.rs
new file mode 100644
index 0000000000..4e79f9afc2
--- /dev/null
+++ b/datafusion-examples/examples/file_stream_provider.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::assert_batches_eq;
+use datafusion_common::instant::Instant;
+use std::fs::{File, OpenOptions};
+use std::io::Write;
+use std::path::PathBuf;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use nix::sys::stat;
+use nix::unistd;
+use tempfile::TempDir;
+use tokio::task::JoinSet;
+
+use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, 
StreamTable};
+use datafusion::datasource::TableProvider;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::Expr;
+
+// Number of lines written to FIFO
+const TEST_BATCH_SIZE: usize = 5;
+const TEST_DATA_SIZE: usize = 5;
+
+/// Makes a TableProvider for a fifo file using `StreamTable` with the 
`StreamProvider` trait
+fn fifo_table(
+    schema: SchemaRef,
+    path: impl Into<PathBuf>,
+    sort: Vec<Vec<Expr>>,
+) -> Arc<dyn TableProvider> {
+    let source = FileStreamProvider::new_file(schema, path.into())
+        .with_batch_size(TEST_BATCH_SIZE)
+        .with_header(true);
+    let config = StreamConfig::new(Arc::new(source)).with_order(sort);
+    Arc::new(StreamTable::new(Arc::new(config)))
+}
+
+fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
+    let file_path = tmp_dir.path().join(file_name);
+    // Simulate an infinite environment via a FIFO file
+    if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
+        exec_err!("{}", e)
+    } else {
+        Ok(file_path)
+    }
+}
+
+fn write_to_fifo(
+    mut file: &File,
+    line: &str,
+    ref_time: Instant,
+    broken_pipe_timeout: Duration,
+) -> Result<()> {
+    // We need to handle broken pipe error until the reader is ready. This
+    // is why we use a timeout to limit the wait duration for the reader.
+    // If the error is different than broken pipe, we fail immediately.
+    while let Err(e) = file.write_all(line.as_bytes()) {
+        if e.raw_os_error().unwrap() == 32 {
+            let interval = Instant::now().duration_since(ref_time);
+            if interval < broken_pipe_timeout {
+                thread::sleep(Duration::from_millis(100));
+                continue;
+            }
+        }
+        return exec_err!("{}", e);
+    }
+    Ok(())
+}
+
+fn create_writing_thread(
+    file_path: PathBuf,
+    maybe_header: Option<String>,
+    lines: Vec<String>,
+    waiting_lock: Arc<AtomicBool>,
+    wait_until: usize,
+    tasks: &mut JoinSet<()>,
+) {
+    // Timeout for a long period of BrokenPipe error
+    let broken_pipe_timeout = Duration::from_secs(10);
+    let sa = file_path.clone();
+    // Spawn a new thread to write to the FIFO file
+    #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
+    tasks.spawn_blocking(move || {
+        let file = OpenOptions::new().write(true).open(sa).unwrap();
+        // Reference time to use when deciding to fail the test
+        let execution_start = Instant::now();
+        if let Some(header) = maybe_header {
+            write_to_fifo(&file, &header, execution_start, 
broken_pipe_timeout).unwrap();
+        }
+        for (cnt, line) in lines.iter().enumerate() {
+            while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+                thread::sleep(Duration::from_millis(50));
+            }
+            write_to_fifo(&file, line, execution_start, 
broken_pipe_timeout).unwrap();
+        }
+        drop(file);
+    });
+}
+
+/// This example demonstrates a scanning against an Arrow data source (JSON) 
and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+    // Create session context
+    let config = SessionConfig::new()
+        .with_batch_size(TEST_BATCH_SIZE)
+        .with_collect_statistics(false)
+        .with_target_partitions(1);
+    let ctx = SessionContext::new_with_config(config);
+    let tmp_dir = TempDir::new()?;
+    let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
+
+    let mut tasks: JoinSet<()> = JoinSet::new();
+    let waiting = Arc::new(AtomicBool::new(true));
+
+    let data_iter = 0..TEST_DATA_SIZE;
+    let lines = data_iter
+        .map(|i| format!("{},{}\n", i, i + 1))
+        .collect::<Vec<_>>();
+
+    create_writing_thread(
+        fifo_path.clone(),
+        Some("a1,a2\n".to_owned()),
+        lines.clone(),
+        waiting.clone(),
+        TEST_DATA_SIZE,
+        &mut tasks,
+    );
+
+    // Create schema
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a1", DataType::UInt32, false),
+        Field::new("a2", DataType::UInt32, false),
+    ]));
+
+    // Specify the ordering:
+    let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
+
+    let provider = fifo_table(schema.clone(), fifo_path, order.clone());
+    ctx.register_table("fifo", provider)?;
+
+    let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
+    let mut stream = df.execute_stream().await.unwrap();
+
+    let mut batches = Vec::new();
+    if let Some(Ok(batch)) = stream.next().await {
+        batches.push(batch)
+    }
+
+    let expected = vec![
+        "+----+----+",
+        "| a1 | a2 |",
+        "+----+----+",
+        "| 0  | 1  |",
+        "| 1  | 2  |",
+        "| 2  | 3  |",
+        "| 3  | 4  |",
+        "| 4  | 5  |",
+        "+----+----+",
+    ];
+
+    assert_batches_eq!(&expected, &batches);
+
+    Ok(())
+}
diff --git a/datafusion/core/src/datasource/stream.rs 
b/datafusion/core/src/datasource/stream.rs
index bcce3c1b64..9cfdb7bb11 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -71,11 +71,13 @@ impl TableProviderFactory for StreamTableFactory {
             );
         };
 
-        let config = StreamConfig::new_file(schema, location.into())
+        let source = FileStreamProvider::new_file(schema, location.into())
             .with_encoding(encoding)
-            .with_order(cmd.order_exprs.clone())
             .with_batch_size(state.config().batch_size())
-            .with_header(header)
+            .with_header(header);
+
+        let config = StreamConfig::new(Arc::new(source))
+            .with_order(cmd.order_exprs.clone())
             .with_constraints(cmd.constraints.clone());
 
         Ok(Arc::new(StreamTable(Arc::new(config))))
@@ -103,19 +105,44 @@ impl FromStr for StreamEncoding {
     }
 }
 
-/// The configuration for a [`StreamTable`]
+/// The StreamProvider trait is used as a generic interface for reading and 
writing from streaming
+/// data sources (such as FIFO, Websocket, Kafka, etc.).  Implementations of 
the provider are
+/// responsible for providing a `RecordBatchReader` and optionally a 
`RecordBatchWriter`.
+pub trait StreamProvider: std::fmt::Debug + Send + Sync {
+    /// Get a reference to the schema for this stream
+    fn schema(&self) -> &SchemaRef;
+    /// Provide `RecordBatchReader`
+    fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
+    /// Provide `RecordBatchWriter`
+    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+        unimplemented!()
+    }
+    /// Display implementation when using as a DataSink
+    fn stream_write_display(
+        &self,
+        t: DisplayFormatType,
+        f: &mut Formatter,
+    ) -> std::fmt::Result;
+}
+
+/// Stream data from the file at `location`
+///
+/// * Data will be read sequentially from the provided `location`
+/// * New data will be appended to the end of the file
+///
+/// The encoding can be configured with [`Self::with_encoding`] and
+/// defaults to [`StreamEncoding::Csv`]
 #[derive(Debug)]
-pub struct StreamConfig {
-    schema: SchemaRef,
+pub struct FileStreamProvider {
     location: PathBuf,
-    batch_size: usize,
     encoding: StreamEncoding,
+    /// Get a reference to the schema for this file stream
+    pub schema: SchemaRef,
     header: bool,
-    order: Vec<Vec<Expr>>,
-    constraints: Constraints,
+    batch_size: usize,
 }
 
-impl StreamConfig {
+impl FileStreamProvider {
     /// Stream data from the file at `location`
     ///
     /// * Data will be read sequentially from the provided `location`
@@ -129,19 +156,11 @@ impl StreamConfig {
             location,
             batch_size: 1024,
             encoding: StreamEncoding::Csv,
-            order: vec![],
             header: false,
-            constraints: Constraints::empty(),
         }
     }
 
-    /// Specify a sort order for the stream
-    pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
-        self.order = order;
-        self
-    }
-
-    /// Specify the batch size
+    /// Set the batch size (the number of rows to load at one time)
     pub fn with_batch_size(mut self, batch_size: usize) -> Self {
         self.batch_size = batch_size;
         self
@@ -158,11 +177,11 @@ impl StreamConfig {
         self.encoding = encoding;
         self
     }
+}
 
-    /// Assign constraints
-    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
-        self.constraints = constraints;
-        self
+impl StreamProvider for FileStreamProvider {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 
     fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
@@ -210,6 +229,58 @@ impl StreamConfig {
             }
         }
     }
+
+    fn stream_write_display(
+        &self,
+        _t: DisplayFormatType,
+        f: &mut Formatter,
+    ) -> std::fmt::Result {
+        f.debug_struct("StreamWrite")
+            .field("location", &self.location)
+            .field("batch_size", &self.batch_size)
+            .field("encoding", &self.encoding)
+            .field("header", &self.header)
+            .finish_non_exhaustive()
+    }
+}
+
+/// The configuration for a [`StreamTable`]
+#[derive(Debug)]
+pub struct StreamConfig {
+    source: Arc<dyn StreamProvider>,
+    order: Vec<Vec<Expr>>,
+    constraints: Constraints,
+}
+
+impl StreamConfig {
+    /// Create a new `StreamConfig` from a `StreamProvider`
+    pub fn new(source: Arc<dyn StreamProvider>) -> Self {
+        Self {
+            source,
+            order: vec![],
+            constraints: Constraints::empty(),
+        }
+    }
+
+    /// Specify a sort order for the stream
+    pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
+        self.order = order;
+        self
+    }
+
+    /// Assign constraints
+    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+        self.constraints = constraints;
+        self
+    }
+
+    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
+        self.source.reader()
+    }
+
+    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+        self.source.writer()
+    }
 }
 
 /// A [`TableProvider`] for an unbounded stream source
@@ -238,7 +309,7 @@ impl TableProvider for StreamTable {
     }
 
     fn schema(&self) -> SchemaRef {
-        self.0.schema.clone()
+        self.0.source.schema().clone()
     }
 
     fn constraints(&self) -> Option<&Constraints> {
@@ -258,14 +329,14 @@ impl TableProvider for StreamTable {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let projected_schema = match projection {
             Some(p) => {
-                let projected = self.0.schema.project(p)?;
+                let projected = self.0.source.schema().project(p)?;
                 create_ordering(&projected, &self.0.order)?
             }
-            None => create_ordering(self.0.schema.as_ref(), &self.0.order)?,
+            None => create_ordering(self.0.source.schema(), &self.0.order)?,
         };
 
         Ok(Arc::new(StreamingTableExec::try_new(
-            self.0.schema.clone(),
+            self.0.source.schema().clone(),
             vec![Arc::new(StreamRead(self.0.clone())) as _],
             projection,
             projected_schema,
@@ -282,7 +353,7 @@ impl TableProvider for StreamTable {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let ordering = match self.0.order.first() {
             Some(x) => {
-                let schema = self.0.schema.as_ref();
+                let schema = self.0.source.schema();
                 let orders = create_ordering(schema, std::slice::from_ref(x))?;
                 let ordering = orders.into_iter().next().unwrap();
                 Some(ordering.into_iter().map(Into::into).collect())
@@ -293,7 +364,7 @@ impl TableProvider for StreamTable {
         Ok(Arc::new(DataSinkExec::new(
             input,
             Arc::new(StreamWrite(self.0.clone())),
-            self.0.schema.clone(),
+            self.0.source.schema().clone(),
             ordering,
         )))
     }
@@ -303,12 +374,12 @@ struct StreamRead(Arc<StreamConfig>);
 
 impl PartitionStream for StreamRead {
     fn schema(&self) -> &SchemaRef {
-        &self.0.schema
+        self.0.source.schema()
     }
 
     fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
         let config = self.0.clone();
-        let schema = self.0.schema.clone();
+        let schema = self.0.source.schema().clone();
         let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
         let tx = builder.tx();
         builder.spawn_blocking(move || {
@@ -329,12 +400,7 @@ struct StreamWrite(Arc<StreamConfig>);
 
 impl DisplayAs for StreamWrite {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
-        f.debug_struct("StreamWrite")
-            .field("location", &self.0.location)
-            .field("batch_size", &self.0.batch_size)
-            .field("encoding", &self.0.encoding)
-            .field("header", &self.0.header)
-            .finish_non_exhaustive()
+        self.0.source.stream_write_display(_t, f)
     }
 }
 
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index cfd0312f81..5895c39a5f 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 
 use crate::datasource::listing::PartitionedFile;
 use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
-use crate::datasource::stream::{StreamConfig, StreamTable};
+use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
 use crate::error::Result;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
 use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
@@ -62,7 +62,8 @@ async fn register_current_csv(
 
     match infinite {
         true => {
-            let config = StreamConfig::new_file(schema, path.into());
+            let source = FileStreamProvider::new_file(schema, path.into());
+            let config = StreamConfig::new(Arc::new(source));
             ctx.register_table(table_name, 
Arc::new(StreamTable::new(Arc::new(config))))?;
         }
         false => {
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index 7aec66825d..e876cfe465 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -31,7 +31,7 @@ use std::task::{Context, Poll};
 
 use crate::dataframe::DataFrame;
 use crate::datasource::provider::TableProviderFactory;
-use crate::datasource::stream::{StreamConfig, StreamTable};
+use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
 use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
 use crate::error::Result;
 use crate::execution::context::{SessionState, TaskContext};
@@ -355,8 +355,8 @@ pub fn register_unbounded_file_with_ordering(
     table_name: &str,
     file_sort_order: Vec<Vec<Expr>>,
 ) -> Result<()> {
-    let config =
-        StreamConfig::new_file(schema, 
file_path.into()).with_order(file_sort_order);
+    let source = FileStreamProvider::new_file(schema, file_path.into());
+    let config = 
StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
 
     // Register table:
     ctx.register_table(table_name, 
Arc::new(StreamTable::new(Arc::new(config))))?;
diff --git a/datafusion/core/tests/fifo/mod.rs 
b/datafusion/core/tests/fifo/mod.rs
index a63240d03d..2e21abffab 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -39,7 +39,7 @@ mod unix_test {
     use tempfile::TempDir;
     use tokio::task::{spawn_blocking, JoinHandle};
 
-    use datafusion::datasource::stream::{StreamConfig, StreamTable};
+    use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, 
StreamTable};
     use datafusion::datasource::TableProvider;
     use datafusion::{
         prelude::{CsvReadOptions, SessionConfig, SessionContext},
@@ -54,10 +54,10 @@ mod unix_test {
         path: impl Into<PathBuf>,
         sort: Vec<Vec<Expr>>,
     ) -> Arc<dyn TableProvider> {
-        let config = StreamConfig::new_file(schema, path.into())
-            .with_order(sort)
+        let source = FileStreamProvider::new_file(schema, path.into())
             .with_batch_size(TEST_BATCH_SIZE)
             .with_header(true);
+        let config = StreamConfig::new(Arc::new(source)).with_order(sort);
         Arc::new(StreamTable::new(Arc::new(config)))
     }
 
diff --git a/datafusion/core/tests/sql/joins.rs 
b/datafusion/core/tests/sql/joins.rs
index f7d5205db0..fad9b94b01 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::datasource::stream::{StreamConfig, StreamTable};
+use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, 
StreamTable};
 use datafusion::test_util::register_unbounded_file_with_ordering;
 
 use super::*;
@@ -166,12 +166,14 @@ async fn join_change_in_planner_without_sort() -> 
Result<()> {
         Field::new("a1", DataType::UInt32, false),
         Field::new("a2", DataType::UInt32, false),
     ]));
-    let left = StreamConfig::new_file(schema.clone(), left_file_path);
+    let left_source = FileStreamProvider::new_file(schema.clone(), 
left_file_path);
+    let left = StreamConfig::new(Arc::new(left_source));
     ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
 
     let right_file_path = tmp_dir.path().join("right.csv");
     File::create(right_file_path.clone())?;
-    let right = StreamConfig::new_file(schema, right_file_path);
+    let right_source = FileStreamProvider::new_file(schema, right_file_path);
+    let right = StreamConfig::new(Arc::new(right_source));
     ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
     let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
     let dataframe = ctx.sql(sql).await?;
@@ -216,11 +218,13 @@ async fn 
join_change_in_planner_without_sort_not_allowed() -> Result<()> {
         Field::new("a1", DataType::UInt32, false),
         Field::new("a2", DataType::UInt32, false),
     ]));
-    let left = StreamConfig::new_file(schema.clone(), left_file_path);
+    let left_source = FileStreamProvider::new_file(schema.clone(), 
left_file_path);
+    let left = StreamConfig::new(Arc::new(left_source));
     ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
     let right_file_path = tmp_dir.path().join("right.csv");
     File::create(right_file_path.clone())?;
-    let right = StreamConfig::new_file(schema.clone(), right_file_path);
+    let right_source = FileStreamProvider::new_file(schema.clone(), 
right_file_path);
+    let right = StreamConfig::new(Arc::new(right_source));
     ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
     let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL 
JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 
10").await?;
     match df.create_physical_plan().await {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to