This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fca1df945e Add `StreamProvider` for configuring `StreamTable` (#10600)
fca1df945e is described below
commit fca1df945ec3a4b51366ecc4f3dd758e03530414
Author: Matthew Turner <[email protected]>
AuthorDate: Thu Jun 6 08:32:19 2024 -0400
Add `StreamProvider` for configuring `StreamTable` (#10600)
* Start setting up new StreamTable config
* Cleanup
* Cleanup
* Fix some tests
* Cleanup
* Start adding example
* Feedback
---
datafusion-examples/Cargo.toml | 3 +
datafusion-examples/README.md | 1 +
.../examples/file_stream_provider.rs | 186 +++++++++++++++++++++
datafusion/core/src/datasource/stream.rs | 140 ++++++++++++----
.../core/src/physical_optimizer/test_utils.rs | 5 +-
datafusion/core/src/test_util/mod.rs | 6 +-
datafusion/core/tests/fifo/mod.rs | 6 +-
datafusion/core/tests/sql/joins.rs | 14 +-
8 files changed, 311 insertions(+), 50 deletions(-)
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 0074a2b8d4..0bcf7c1afc 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread",
"parking_lot"] }
tonic = "0.11"
url = { workspace = true }
uuid = "1.7"
+
+[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
+nix = { version = "0.28.0", features = ["fs"] }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index a5395ea7aa..c34f706adb 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -56,6 +56,7 @@ cargo run --example csv_sql
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods
which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert
query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze
`Expr`s
+- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query
on `FileStreamProvider` which implements `StreamProvider` for reading and
writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run
DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE
FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date
function
diff --git a/datafusion-examples/examples/file_stream_provider.rs
b/datafusion-examples/examples/file_stream_provider.rs
new file mode 100644
index 0000000000..4e79f9afc2
--- /dev/null
+++ b/datafusion-examples/examples/file_stream_provider.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::assert_batches_eq;
+use datafusion_common::instant::Instant;
+use std::fs::{File, OpenOptions};
+use std::io::Write;
+use std::path::PathBuf;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_schema::SchemaRef;
+use futures::StreamExt;
+use nix::sys::stat;
+use nix::unistd;
+use tempfile::TempDir;
+use tokio::task::JoinSet;
+
+use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
+use datafusion::datasource::TableProvider;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::Expr;
+
+// Number of lines written to FIFO
+const TEST_BATCH_SIZE: usize = 5;
+const TEST_DATA_SIZE: usize = 5;
+
+/// Makes a TableProvider for a fifo file using `StreamTable` with the
`StreamProvider` trait
+fn fifo_table(
+ schema: SchemaRef,
+ path: impl Into<PathBuf>,
+ sort: Vec<Vec<Expr>>,
+) -> Arc<dyn TableProvider> {
+ let source = FileStreamProvider::new_file(schema, path.into())
+ .with_batch_size(TEST_BATCH_SIZE)
+ .with_header(true);
+ let config = StreamConfig::new(Arc::new(source)).with_order(sort);
+ Arc::new(StreamTable::new(Arc::new(config)))
+}
+
+fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
+ let file_path = tmp_dir.path().join(file_name);
+ // Simulate an infinite environment via a FIFO file
+ if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
+ exec_err!("{}", e)
+ } else {
+ Ok(file_path)
+ }
+}
+
+fn write_to_fifo(
+ mut file: &File,
+ line: &str,
+ ref_time: Instant,
+ broken_pipe_timeout: Duration,
+) -> Result<()> {
+ // We need to handle broken pipe error until the reader is ready. This
+ // is why we use a timeout to limit the wait duration for the reader.
+ // If the error is different than broken pipe, we fail immediately.
+ while let Err(e) = file.write_all(line.as_bytes()) {
+ if e.raw_os_error().unwrap() == 32 {
+ let interval = Instant::now().duration_since(ref_time);
+ if interval < broken_pipe_timeout {
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
+ }
+ return exec_err!("{}", e);
+ }
+ Ok(())
+}
+
+fn create_writing_thread(
+ file_path: PathBuf,
+ maybe_header: Option<String>,
+ lines: Vec<String>,
+ waiting_lock: Arc<AtomicBool>,
+ wait_until: usize,
+ tasks: &mut JoinSet<()>,
+) {
+ // Timeout for a long period of BrokenPipe error
+ let broken_pipe_timeout = Duration::from_secs(10);
+ let sa = file_path.clone();
+ // Spawn a new thread to write to the FIFO file
+ #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
+ tasks.spawn_blocking(move || {
+ let file = OpenOptions::new().write(true).open(sa).unwrap();
+ // Reference time to use when deciding to fail the test
+ let execution_start = Instant::now();
+ if let Some(header) = maybe_header {
+ write_to_fifo(&file, &header, execution_start,
broken_pipe_timeout).unwrap();
+ }
+ for (cnt, line) in lines.iter().enumerate() {
+ while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+ thread::sleep(Duration::from_millis(50));
+ }
+ write_to_fifo(&file, line, execution_start,
broken_pipe_timeout).unwrap();
+ }
+ drop(file);
+ });
+}
+
+/// This example demonstrates a scanning against an Arrow data source (JSON)
and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+ // Create session context
+ let config = SessionConfig::new()
+ .with_batch_size(TEST_BATCH_SIZE)
+ .with_collect_statistics(false)
+ .with_target_partitions(1);
+ let ctx = SessionContext::new_with_config(config);
+ let tmp_dir = TempDir::new()?;
+ let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
+
+ let mut tasks: JoinSet<()> = JoinSet::new();
+ let waiting = Arc::new(AtomicBool::new(true));
+
+ let data_iter = 0..TEST_DATA_SIZE;
+ let lines = data_iter
+ .map(|i| format!("{},{}\n", i, i + 1))
+ .collect::<Vec<_>>();
+
+ create_writing_thread(
+ fifo_path.clone(),
+ Some("a1,a2\n".to_owned()),
+ lines.clone(),
+ waiting.clone(),
+ TEST_DATA_SIZE,
+ &mut tasks,
+ );
+
+ // Create schema
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::UInt32, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+
+ // Specify the ordering:
+ let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
+
+ let provider = fifo_table(schema.clone(), fifo_path, order.clone());
+ ctx.register_table("fifo", provider)?;
+
+ let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
+ let mut stream = df.execute_stream().await.unwrap();
+
+ let mut batches = Vec::new();
+ if let Some(Ok(batch)) = stream.next().await {
+ batches.push(batch)
+ }
+
+ let expected = vec![
+ "+----+----+",
+ "| a1 | a2 |",
+ "+----+----+",
+ "| 0 | 1 |",
+ "| 1 | 2 |",
+ "| 2 | 3 |",
+ "| 3 | 4 |",
+ "| 4 | 5 |",
+ "+----+----+",
+ ];
+
+ assert_batches_eq!(&expected, &batches);
+
+ Ok(())
+}
diff --git a/datafusion/core/src/datasource/stream.rs
b/datafusion/core/src/datasource/stream.rs
index bcce3c1b64..9cfdb7bb11 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -71,11 +71,13 @@ impl TableProviderFactory for StreamTableFactory {
);
};
- let config = StreamConfig::new_file(schema, location.into())
+ let source = FileStreamProvider::new_file(schema, location.into())
.with_encoding(encoding)
- .with_order(cmd.order_exprs.clone())
.with_batch_size(state.config().batch_size())
- .with_header(header)
+ .with_header(header);
+
+ let config = StreamConfig::new(Arc::new(source))
+ .with_order(cmd.order_exprs.clone())
.with_constraints(cmd.constraints.clone());
Ok(Arc::new(StreamTable(Arc::new(config))))
@@ -103,19 +105,44 @@ impl FromStr for StreamEncoding {
}
}
-/// The configuration for a [`StreamTable`]
+/// The StreamProvider trait is used as a generic interface for reading and
writing from streaming
+/// data sources (such as FIFO, Websocket, Kafka, etc.). Implementations of
the provider are
+/// responsible for providing a `RecordBatchReader` and optionally a
`RecordBatchWriter`.
+pub trait StreamProvider: std::fmt::Debug + Send + Sync {
+ /// Get a reference to the schema for this stream
+ fn schema(&self) -> &SchemaRef;
+ /// Provide `RecordBatchReader`
+ fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
+ /// Provide `RecordBatchWriter`
+ fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+ unimplemented!()
+ }
+ /// Display implementation when using as a DataSink
+ fn stream_write_display(
+ &self,
+ t: DisplayFormatType,
+ f: &mut Formatter,
+ ) -> std::fmt::Result;
+}
+
+/// Stream data from the file at `location`
+///
+/// * Data will be read sequentially from the provided `location`
+/// * New data will be appended to the end of the file
+///
+/// The encoding can be configured with [`Self::with_encoding`] and
+/// defaults to [`StreamEncoding::Csv`]
#[derive(Debug)]
-pub struct StreamConfig {
- schema: SchemaRef,
+pub struct FileStreamProvider {
location: PathBuf,
- batch_size: usize,
encoding: StreamEncoding,
+ /// Get a reference to the schema for this file stream
+ pub schema: SchemaRef,
header: bool,
- order: Vec<Vec<Expr>>,
- constraints: Constraints,
+ batch_size: usize,
}
-impl StreamConfig {
+impl FileStreamProvider {
/// Stream data from the file at `location`
///
/// * Data will be read sequentially from the provided `location`
@@ -129,19 +156,11 @@ impl StreamConfig {
location,
batch_size: 1024,
encoding: StreamEncoding::Csv,
- order: vec![],
header: false,
- constraints: Constraints::empty(),
}
}
- /// Specify a sort order for the stream
- pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
- self.order = order;
- self
- }
-
- /// Specify the batch size
+ /// Set the batch size (the number of rows to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
@@ -158,11 +177,11 @@ impl StreamConfig {
self.encoding = encoding;
self
}
+}
- /// Assign constraints
- pub fn with_constraints(mut self, constraints: Constraints) -> Self {
- self.constraints = constraints;
- self
+impl StreamProvider for FileStreamProvider {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
}
fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
@@ -210,6 +229,58 @@ impl StreamConfig {
}
}
}
+
+ fn stream_write_display(
+ &self,
+ _t: DisplayFormatType,
+ f: &mut Formatter,
+ ) -> std::fmt::Result {
+ f.debug_struct("StreamWrite")
+ .field("location", &self.location)
+ .field("batch_size", &self.batch_size)
+ .field("encoding", &self.encoding)
+ .field("header", &self.header)
+ .finish_non_exhaustive()
+ }
+}
+
+/// The configuration for a [`StreamTable`]
+#[derive(Debug)]
+pub struct StreamConfig {
+ source: Arc<dyn StreamProvider>,
+ order: Vec<Vec<Expr>>,
+ constraints: Constraints,
+}
+
+impl StreamConfig {
+ /// Create a new `StreamConfig` from a `StreamProvider`
+ pub fn new(source: Arc<dyn StreamProvider>) -> Self {
+ Self {
+ source,
+ order: vec![],
+ constraints: Constraints::empty(),
+ }
+ }
+
+ /// Specify a sort order for the stream
+ pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
+ self.order = order;
+ self
+ }
+
+ /// Assign constraints
+ pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ self.constraints = constraints;
+ self
+ }
+
+ fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
+ self.source.reader()
+ }
+
+ fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+ self.source.writer()
+ }
}
/// A [`TableProvider`] for an unbounded stream source
@@ -238,7 +309,7 @@ impl TableProvider for StreamTable {
}
fn schema(&self) -> SchemaRef {
- self.0.schema.clone()
+ self.0.source.schema().clone()
}
fn constraints(&self) -> Option<&Constraints> {
@@ -258,14 +329,14 @@ impl TableProvider for StreamTable {
) -> Result<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(p) => {
- let projected = self.0.schema.project(p)?;
+ let projected = self.0.source.schema().project(p)?;
create_ordering(&projected, &self.0.order)?
}
- None => create_ordering(self.0.schema.as_ref(), &self.0.order)?,
+ None => create_ordering(self.0.source.schema(), &self.0.order)?,
};
Ok(Arc::new(StreamingTableExec::try_new(
- self.0.schema.clone(),
+ self.0.source.schema().clone(),
vec![Arc::new(StreamRead(self.0.clone())) as _],
projection,
projected_schema,
@@ -282,7 +353,7 @@ impl TableProvider for StreamTable {
) -> Result<Arc<dyn ExecutionPlan>> {
let ordering = match self.0.order.first() {
Some(x) => {
- let schema = self.0.schema.as_ref();
+ let schema = self.0.source.schema();
let orders = create_ordering(schema, std::slice::from_ref(x))?;
let ordering = orders.into_iter().next().unwrap();
Some(ordering.into_iter().map(Into::into).collect())
@@ -293,7 +364,7 @@ impl TableProvider for StreamTable {
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(StreamWrite(self.0.clone())),
- self.0.schema.clone(),
+ self.0.source.schema().clone(),
ordering,
)))
}
@@ -303,12 +374,12 @@ struct StreamRead(Arc<StreamConfig>);
impl PartitionStream for StreamRead {
fn schema(&self) -> &SchemaRef {
- &self.0.schema
+ self.0.source.schema()
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.0.clone();
- let schema = self.0.schema.clone();
+ let schema = self.0.source.schema().clone();
let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
let tx = builder.tx();
builder.spawn_blocking(move || {
@@ -329,12 +400,7 @@ struct StreamWrite(Arc<StreamConfig>);
impl DisplayAs for StreamWrite {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
- f.debug_struct("StreamWrite")
- .field("location", &self.0.location)
- .field("batch_size", &self.0.batch_size)
- .field("encoding", &self.0.encoding)
- .field("header", &self.0.header)
- .finish_non_exhaustive()
+ self.0.source.stream_write_display(_t, f)
}
}
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index cfd0312f81..5895c39a5f 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
-use crate::datasource::stream::{StreamConfig, StreamTable};
+use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use crate::error::Result;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
@@ -62,7 +62,8 @@ async fn register_current_csv(
match infinite {
true => {
- let config = StreamConfig::new_file(schema, path.into());
+ let source = FileStreamProvider::new_file(schema, path.into());
+ let config = StreamConfig::new(Arc::new(source));
ctx.register_table(table_name,
Arc::new(StreamTable::new(Arc::new(config))))?;
}
false => {
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index 7aec66825d..e876cfe465 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -31,7 +31,7 @@ use std::task::{Context, Poll};
use crate::dataframe::DataFrame;
use crate::datasource::provider::TableProviderFactory;
-use crate::datasource::stream::{StreamConfig, StreamTable};
+use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
use crate::error::Result;
use crate::execution::context::{SessionState, TaskContext};
@@ -355,8 +355,8 @@ pub fn register_unbounded_file_with_ordering(
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
) -> Result<()> {
- let config =
- StreamConfig::new_file(schema,
file_path.into()).with_order(file_sort_order);
+ let source = FileStreamProvider::new_file(schema, file_path.into());
+ let config =
StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
// Register table:
ctx.register_table(table_name,
Arc::new(StreamTable::new(Arc::new(config))))?;
diff --git a/datafusion/core/tests/fifo/mod.rs
b/datafusion/core/tests/fifo/mod.rs
index a63240d03d..2e21abffab 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -39,7 +39,7 @@ mod unix_test {
use tempfile::TempDir;
use tokio::task::{spawn_blocking, JoinHandle};
- use datafusion::datasource::stream::{StreamConfig, StreamTable};
+ use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::{
prelude::{CsvReadOptions, SessionConfig, SessionContext},
@@ -54,10 +54,10 @@ mod unix_test {
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
) -> Arc<dyn TableProvider> {
- let config = StreamConfig::new_file(schema, path.into())
- .with_order(sort)
+ let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
.with_header(true);
+ let config = StreamConfig::new(Arc::new(source)).with_order(sort);
Arc::new(StreamTable::new(Arc::new(config)))
}
diff --git a/datafusion/core/tests/sql/joins.rs
b/datafusion/core/tests/sql/joins.rs
index f7d5205db0..fad9b94b01 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::datasource::stream::{StreamConfig, StreamTable};
+use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
use datafusion::test_util::register_unbounded_file_with_ordering;
use super::*;
@@ -166,12 +166,14 @@ async fn join_change_in_planner_without_sort() ->
Result<()> {
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));
- let left = StreamConfig::new_file(schema.clone(), left_file_path);
+ let left_source = FileStreamProvider::new_file(schema.clone(),
left_file_path);
+ let left = StreamConfig::new(Arc::new(left_source));
ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone())?;
- let right = StreamConfig::new_file(schema, right_file_path);
+ let right_source = FileStreamProvider::new_file(schema, right_file_path);
+ let right = StreamConfig::new(Arc::new(right_source));
ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
@@ -216,11 +218,13 @@ async fn
join_change_in_planner_without_sort_not_allowed() -> Result<()> {
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));
- let left = StreamConfig::new_file(schema.clone(), left_file_path);
+ let left_source = FileStreamProvider::new_file(schema.clone(),
left_file_path);
+ let left = StreamConfig::new(Arc::new(left_source));
ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone())?;
- let right = StreamConfig::new_file(schema.clone(), right_file_path);
+ let right_source = FileStreamProvider::new_file(schema.clone(),
right_file_path);
+ let right = StreamConfig::new(Arc::new(right_source));
ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL
JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 +
10").await?;
match df.create_physical_plan().await {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]