jayzhan211 commented on code in PR #13540:
URL: https://github.com/apache/datafusion/pull/13540#discussion_r1857677256


##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -365,8 +365,165 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+    /// Generate the next batch, return `None` when no more batches are 
available
+    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+
+    /// Creates a boxed clone of this generator.
+    ///
+    /// This method is required because `Clone` cannot be directly implemented 
for
+    /// trait objects. It provides a way to clone trait objects of
+    /// StreamingBatchGenerator while maintaining proper type erasure.
+    fn clone_box(&self) -> Box<dyn StreamingBatchGenerator>;
+}
+
+/// Execution plan for streaming in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all 
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant 
memory.
+pub struct StreamingMemoryExec {
+    /// Schema representing the data
+    schema: SchemaRef,
+    /// Functions to generate batches for each partition
+    batch_generators: Vec<Box<dyn StreamingBatchGenerator>>,
+    /// Total number of rows to generate for statistics
+    cache: PlanProperties,
+}
+
+impl StreamingMemoryExec {
+    /// Create a new streaming memory execution plan
+    pub fn try_new(
+        schema: SchemaRef,
+        generators: Vec<Box<dyn StreamingBatchGenerator>>,
+    ) -> Result<Self> {
+        let cache = PlanProperties::new(
+            EquivalenceProperties::new(Arc::clone(&schema)),
+            Partitioning::RoundRobinBatch(generators.len()),
+            ExecutionMode::Bounded,
+        );
+        Ok(Self {
+            schema,
+            batch_generators: generators,
+            cache,
+        })
+    }
+}
+
+impl fmt::Debug for StreamingMemoryExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("StreamingMemoryExec")
+            .field("schema", &self.schema)
+            .field("batch_generators", &self.batch_generators)
+            .finish()
+    }
+}
+
+impl DisplayAs for StreamingMemoryExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "StreamingMemoryExec: partitions={}, 
batch_generators=[{}]",
+                    self.batch_generators.len(),
+                    self.batch_generators
+                        .iter()
+                        .map(|g| g.to_string())
+                        .collect::<Vec<_>>()
+                        .join(", ")
+                )
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for StreamingMemoryExec {
+    fn name(&self) -> &'static str {
+        "StreamingMemoryExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in StreamingMemoryExec")
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        if partition >= self.batch_generators.len() {
+            return internal_err!(
+                "Invalid partition {} for StreamingMemoryExec with {} 
partitions",
+                partition,
+                self.batch_generators.len()
+            );
+        }
+
+        Ok(Box::pin(StreamingMemoryStream {
+            schema: Arc::clone(&self.schema),
+            generator: self.batch_generators[partition].clone_box(),
+        }))
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(Statistics::new_unknown(&self.schema))
+    }
+}
+
+/// Stream that generates record batches on demand
+pub struct StreamingMemoryStream {
+    schema: SchemaRef,
+    generator: Box<dyn StreamingBatchGenerator>,

Review Comment:
   Should we use `Arc` given it makes sense to have generator generate across 
threads



##########
datafusion/sqllogictest/test_files/table_functions.slt:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test generate_series table function
+
+query I rowsort
+SELECT * FROM generate_series(1, 5)
+----
+1
+2
+3
+4
+5
+
+query I rowsort
+SELECT * FROM generate_series(1, 1)
+----
+1
+
+query I rowsort
+SELECT * FROM generate_series(3, 6)
+----
+3
+4
+5
+6
+
+query I rowsort
+SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
+----
+15
+
+# Test generate_series with WHERE clause
+query I rowsort
+SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0
+----
+10
+2
+4
+6
+8
+
+# Test generate_series with ORDER BY
+query I
+SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
+----
+5
+4
+3
+2
+1
+
+# Test generate_series with LIMIT
+query I rowsort
+SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
+----
+1
+2
+3
+4
+5
+
+# Test generate_series in subquery
+query I rowsort
+SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
+----
+11
+12
+13
+
+# Test generate_series with JOIN
+query II rowsort
+SELECT a.v1, b.v1 
+FROM generate_series(1, 3) a(v1)
+JOIN generate_series(2, 4) b(v1)
+ON a.v1 = b.v1 - 1
+----
+1 2
+2 3
+3 4
+
+query TT
+EXPLAIN SELECT * FROM generate_series(1, 5)
+----
+logical_plan TableScan: tmp_table projection=[value]
+physical_plan StreamingMemoryExec: partitions=1, 
batch_generators=[generate_series: start=1, end=5, batch_size=8192]
+
+query error DataFusion error: This feature is not implemented: End value must 
be greater than or equal to start value
+SELECT * FROM generate_series(5, 1)
+
+statement error DataFusion error: Error during planning: First argument must 
be an integer literal
+SELECT * FROM generate_series(NULL, 5)
+
+statement error DataFusion error: Error during planning: Second argument must 
be an integer literal
+SELECT * FROM generate_series(1, NULL)

Review Comment:
   ```
   D SELECT * FROM generate_series(1, null);
   ┌─────────────────┐
   │ generate_series │
   │      int64      │
   ├─────────────────┤
   │     0 rows      │
   └─────────────────┘
   ```



##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -365,8 +365,165 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+    /// Generate the next batch, return `None` when no more batches are 
available
+    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+
+    /// Creates a boxed clone of this generator.
+    ///
+    /// This method is required because `Clone` cannot be directly implemented 
for
+    /// trait objects. It provides a way to clone trait objects of
+    /// StreamingBatchGenerator while maintaining proper type erasure.
+    fn clone_box(&self) -> Box<dyn StreamingBatchGenerator>;
+}
+
+/// Execution plan for streaming in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all 
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant 
memory.
+pub struct StreamingMemoryExec {
+    /// Schema representing the data
+    schema: SchemaRef,
+    /// Functions to generate batches for each partition
+    batch_generators: Vec<Box<dyn StreamingBatchGenerator>>,

Review Comment:
   ```suggestion
       batch_generators: Vec<Arc<dyn StreamingBatchGenerator>>,
   ```



##########
datafusion/catalog/src/table.rs:
##########
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
         cmd: &CreateExternalTable,
     ) -> Result<Arc<dyn TableProvider>>;
 }
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Debug + Sync + Send {

Review Comment:
   ```suggestion
   /// Returns this function's name
       fn name(&self) -> &str;
   ```
   
   This is much more consistent with other UDFs



##########
datafusion/sqllogictest/test_files/table_functions.slt:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test generate_series table function
+
+query I rowsort
+SELECT * FROM generate_series(1, 5)
+----
+1
+2
+3
+4
+5
+
+query I rowsort
+SELECT * FROM generate_series(1, 1)
+----
+1
+
+query I rowsort
+SELECT * FROM generate_series(3, 6)
+----
+3
+4
+5
+6
+
+query I rowsort
+SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
+----
+15
+
+# Test generate_series with WHERE clause
+query I rowsort
+SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0
+----
+10
+2
+4
+6
+8
+
+# Test generate_series with ORDER BY
+query I
+SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
+----
+5
+4
+3
+2
+1
+
+# Test generate_series with LIMIT
+query I rowsort
+SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
+----
+1
+2
+3
+4
+5
+
+# Test generate_series in subquery
+query I rowsort
+SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
+----
+11
+12
+13
+
+# Test generate_series with JOIN
+query II rowsort
+SELECT a.v1, b.v1 
+FROM generate_series(1, 3) a(v1)
+JOIN generate_series(2, 4) b(v1)
+ON a.v1 = b.v1 - 1
+----
+1 2
+2 3
+3 4
+
+query TT
+EXPLAIN SELECT * FROM generate_series(1, 5)
+----
+logical_plan TableScan: tmp_table projection=[value]
+physical_plan StreamingMemoryExec: partitions=1, 
batch_generators=[generate_series: start=1, end=5, batch_size=8192]
+
+query error DataFusion error: This feature is not implemented: End value must 
be greater than or equal to start value
+SELECT * FROM generate_series(5, 1)
+
+statement error DataFusion error: Error during planning: First argument must 
be an integer literal
+SELECT * FROM generate_series(NULL, 5)

Review Comment:
   We might not return error for this query
   ```
   D SELECT * FROM generate_series(null, 5);
   ┌─────────────────┐
   │ generate_series │
   │      int64      │
   ├─────────────────┤
   │     0 rows      │
   └─────────────────┘
   ```



##########
datafusion/sqllogictest/test_files/table_functions.slt:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test generate_series table function
+
+query I rowsort
+SELECT * FROM generate_series(1, 5)
+----
+1
+2
+3
+4
+5
+
+query I rowsort
+SELECT * FROM generate_series(1, 1)
+----
+1
+
+query I rowsort
+SELECT * FROM generate_series(3, 6)
+----
+3
+4
+5
+6
+
+query I rowsort
+SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
+----
+15
+
+# Test generate_series with WHERE clause
+query I rowsort
+SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0
+----
+10
+2
+4
+6
+8
+
+# Test generate_series with ORDER BY
+query I
+SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
+----
+5
+4
+3
+2
+1
+
+# Test generate_series with LIMIT
+query I rowsort
+SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
+----
+1
+2
+3
+4
+5
+
+# Test generate_series in subquery
+query I rowsort
+SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
+----
+11
+12
+13
+
+# Test generate_series with JOIN
+query II rowsort
+SELECT a.v1, b.v1 
+FROM generate_series(1, 3) a(v1)
+JOIN generate_series(2, 4) b(v1)
+ON a.v1 = b.v1 - 1
+----
+1 2
+2 3
+3 4
+
+query TT
+EXPLAIN SELECT * FROM generate_series(1, 5)
+----
+logical_plan TableScan: tmp_table projection=[value]
+physical_plan StreamingMemoryExec: partitions=1, 
batch_generators=[generate_series: start=1, end=5, batch_size=8192]
+
+query error DataFusion error: This feature is not implemented: End value must 
be greater than or equal to start value
+SELECT * FROM generate_series(5, 1)
+
+statement error DataFusion error: Error during planning: First argument must 
be an integer literal
+SELECT * FROM generate_series(NULL, 5)
+
+statement error DataFusion error: Error during planning: Second argument must 
be an integer literal
+SELECT * FROM generate_series(1, NULL)
+
+statement error DataFusion error: Error during planning: generate_series 
expects 2 arguments

Review Comment:
   ```
   D SELECT * FROM generate_series(1, 5, null);
   ┌─────────────────┐
   │ generate_series │
   │      int64      │
   ├─────────────────┤
   │     0 rows      │
   └─────────────────┘
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to