alamb commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861245271
##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn user_defined_table_function() -> Result<()> {
+ let mut ctx = SessionContext::new();
+
+ let integer_series = integer_udtf();
+ ctx.register_udtf(create_udtf(
+ "integer_series",
+ vec![DataType::Int64, DataType::Int64],
+ Arc::new(DataType::Int64),
+ Volatility::Immutable,
+ integer_series,
+ ));
+
+ let struct_func = struct_udtf();
+ ctx.register_udtf(create_udtf(
+ "struct_func",
+ vec![DataType::Int64],
+ Arc::new(DataType::Struct(
+ [
+ Field::new("f1", DataType::Utf8, false),
+ Field::new("f2", DataType::Int64, false),
+ ]
+ .to_vec(),
+ )),
+ Volatility::Immutable,
+ struct_func,
+ ));
+
+ let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+ let expected = vec![
+ "+-------------------------+",
+ "| struct_func(Int64(5)) |",
+ "+-------------------------+",
+ "| {\"f1\": \"test\", \"f2\": 5} |",
+ "+-------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result = plan_and_collect(&ctx, "SELECT
integer_series(6,5)").await?;
+
+ let expected = vec![
+ "+-----------------------------------+",
+ "| integer_series(Int64(6),Int64(5)) |",
+ "+-----------------------------------+",
+ "+-----------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result = plan_and_collect(&ctx, "SELECT
integer_series(1,5)").await?;
+
+ let expected = vec![
+ "+-----------------------------------+",
+ "| integer_series(Int64(1),Int64(5)) |",
+ "+-----------------------------------+",
+ "| 1 |",
+ "| 2 |",
+ "| 3 |",
+ "| 4 |",
+ "| 5 |",
+ "+-----------------------------------+",
+ ];
Review Comment:
This is a good example of a `UDT` producing more row than went in 👍
Would it be possible to write an example that also produces a different
number of *columns* than went in? I think that is what @Ted-Jiang and I are
pointing out in in our comments below
##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn user_defined_table_function() -> Result<()> {
+ let mut ctx = SessionContext::new();
+
+ let integer_series = integer_udtf();
+ ctx.register_udtf(create_udtf(
+ "integer_series",
+ vec![DataType::Int64, DataType::Int64],
+ Arc::new(DataType::Int64),
+ Volatility::Immutable,
+ integer_series,
+ ));
+
+ let struct_func = struct_udtf();
+ ctx.register_udtf(create_udtf(
+ "struct_func",
+ vec![DataType::Int64],
+ Arc::new(DataType::Struct(
+ [
+ Field::new("f1", DataType::Utf8, false),
+ Field::new("f2", DataType::Int64, false),
+ ]
+ .to_vec(),
+ )),
+ Volatility::Immutable,
+ struct_func,
+ ));
+
+ let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+ let expected = vec![
+ "+-------------------------+",
+ "| struct_func(Int64(5)) |",
+ "+-------------------------+",
+ "| {\"f1\": \"test\", \"f2\": 5} |",
+ "+-------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result = plan_and_collect(&ctx, "SELECT
integer_series(6,5)").await?;
+
+ let expected = vec![
+ "+-----------------------------------+",
+ "| integer_series(Int64(6),Int64(5)) |",
+ "+-----------------------------------+",
+ "+-----------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result = plan_and_collect(&ctx, "SELECT
integer_series(1,5)").await?;
+
+ let expected = vec![
+ "+-----------------------------------+",
+ "| integer_series(Int64(1),Int64(5)) |",
+ "+-----------------------------------+",
+ "| 1 |",
+ "| 2 |",
+ "| 3 |",
+ "| 4 |",
+ "| 5 |",
+ "+-----------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result = plan_and_collect(
+ &ctx,
+ "SELECT asd, struct_func(qwe), integer_series(asd, qwe),
integer_series(1, qwe) r FROM (select 1 asd, 3 qwe UNION ALL select 2 asd, 4
qwe) x",
+ )
+ .await?;
+
+ let expected = vec![
+
"+-----+-------------------------+-----------------------------+---+",
+ "| asd | struct_func(x.qwe) | integer_series(x.asd,x.qwe) | r
|",
+
"+-----+-------------------------+-----------------------------+---+",
+ "| 1 | {\"f1\": \"test\", \"f2\": 3} | 1
| 1 |",
+ "| 1 | | 2 | 2
|",
+ "| 1 | | 3 | 3
|",
+ "| 2 | {\"f1\": \"test\", \"f2\": 4} | 2
| 1 |",
+ "| 2 | | 3 | 2
|",
+ "| 2 | | 4 | 3
|",
+ "| 2 | | | 4
|",
+
"+-----+-------------------------+-----------------------------+---+",
+ ];
+
+ assert_batches_eq!(expected, &result);
+
+ let result =
+ plan_and_collect(&ctx, "SELECT * from integer_series(1,5)
pos(n)").await?;
Review Comment:
Can you explain what this test is supposed to be demonstrating? I am not
quite sure what it shows
##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+ Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> +
Send + Sync>;
Review Comment:
I am also a little mystified by this signature
. It looks like "Second tuple" was the start of a thought that didn't get
finished? I also don't understand what the `usize` in the tuple represents --
perhaps you can add some comments explaining its purpose?
Also, I agree with @Ted-Jiang 's analysis -- I would expect this signature
to return a "table" (aka a `RecordBatch` or a `Vec<ColumnarValue>` if preferred
Perhaps something like
```rust
Arc<dyn Fn(&[ColumnarValue]) -> Result<RecordBatch> + Send + Sync>;
```
or
```rust
Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]