This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 86740bfd3d Add generate_series() udtf (and introduce 'lazy'
`MemoryExec`) (#13540)
86740bfd3d is described below
commit 86740bfd3d9831d6b7c1d0e1bf4a21d91598a0ac
Author: Yongting You <[email protected]>
AuthorDate: Tue Dec 3 02:43:53 2024 +0800
Add generate_series() udtf (and introduce 'lazy' `MemoryExec`) (#13540)
* Add generate_series() udtf
* liscence
* fix examples
* clippy
* comments
* singleton udtf init
* StreamingMemoryExec -> LazyMemoryExec
* use RwLock
* test udf+udtf generate_series() in the same sql
* CI
* CI
* small fixes
---
Cargo.toml | 2 +
datafusion-cli/Cargo.lock | 213 +++++++++-------
datafusion-cli/Cargo.toml | 1 +
datafusion-cli/src/functions.rs | 2 +-
datafusion-examples/Cargo.toml | 1 +
datafusion-examples/examples/simple_udtf.rs | 2 +-
datafusion/catalog/src/table.rs | 41 ++-
datafusion/core/Cargo.toml | 1 +
datafusion/core/src/datasource/function.rs | 63 -----
datafusion/core/src/datasource/mod.rs | 1 -
datafusion/core/src/execution/context/mod.rs | 9 +-
datafusion/core/src/execution/session_state.rs | 17 +-
.../core/src/execution/session_state_defaults.rs | 8 +-
datafusion/core/src/lib.rs | 5 +
.../user_defined/user_defined_table_functions.rs | 2 +-
datafusion/functions-table/Cargo.toml | 62 +++++
datafusion/functions-table/src/generate_series.rs | 180 +++++++++++++
datafusion/functions-table/src/lib.rs | 51 ++++
datafusion/physical-plan/src/memory.rs | 280 ++++++++++++++++++++-
.../sqllogictest/test_files/table_functions.slt | 142 +++++++++++
20 files changed, 921 insertions(+), 162 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 76bc50d59a..1ca6cdfdb1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ members = [
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
+ "datafusion/functions-table",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/functions-window-common",
@@ -110,6 +111,7 @@ datafusion-functions = { path = "datafusion/functions",
version = "43.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate",
version = "43.0.0" }
datafusion-functions-aggregate-common = { path =
"datafusion/functions-aggregate-common", version = "43.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version
= "43.0.0" }
+datafusion-functions-table = { path = "datafusion/functions-table", version =
"43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version
= "43.0.0" }
datafusion-functions-window-common = { path =
"datafusion/functions-window-common", version = "43.0.0" }
datafusion-macros = { path = "datafusion/macros", version = "43.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 04b0b0d22c..f06a8b210b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -220,7 +220,7 @@ dependencies = [
"chrono",
"chrono-tz",
"half",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
"num",
]
@@ -406,9 +406,9 @@ dependencies = [
[[package]]
name = "async-compression"
-version = "0.4.17"
+version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857"
+checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
dependencies = [
"bzip2",
"flate2",
@@ -814,9 +814,9 @@ dependencies = [
[[package]]
name = "blake3"
-version = "1.5.4"
+version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7"
+checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e"
dependencies = [
"arrayref",
"arrayvec",
@@ -880,9 +880,9 @@ checksum =
"1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
-version = "1.8.0"
+version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
+checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
[[package]]
name = "bytes-utils"
@@ -917,9 +917,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.2.1"
+version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47"
+checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc"
dependencies = [
"jobserver",
"libc",
@@ -1080,6 +1080,16 @@ dependencies = [
"libc",
]
+[[package]]
+name = "core-foundation"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@@ -1097,9 +1107,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
-version = "0.2.15"
+version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6"
+checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3"
dependencies = [
"libc",
]
@@ -1209,6 +1219,7 @@ dependencies = [
"datafusion-functions",
"datafusion-functions-aggregate",
"datafusion-functions-nested",
+ "datafusion-functions-table",
"datafusion-functions-window",
"datafusion-optimizer",
"datafusion-physical-expr",
@@ -1265,6 +1276,7 @@ dependencies = [
"clap",
"ctor",
"datafusion",
+ "datafusion-catalog",
"dirs",
"env_logger",
"futures",
@@ -1446,6 +1458,29 @@ dependencies = [
"rand",
]
+[[package]]
+name = "datafusion-functions-table"
+version = "43.0.0"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-schema",
+ "async-trait",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-plan",
+ "half",
+ "indexmap",
+ "log",
+ "parking_lot",
+ "paste",
+]
+
[[package]]
name = "datafusion-functions-window"
version = "43.0.0"
@@ -1700,12 +1735,12 @@ checksum =
"5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
-version = "0.3.9"
+version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
+checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
dependencies = [
"libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -1972,9 +2007,9 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.15.1"
+version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
+checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "heck"
@@ -1988,12 +2023,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
-[[package]]
-name = "hermit-abi"
-version = "0.3.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
-
[[package]]
name = "hex"
version = "0.4.3"
@@ -2162,8 +2191,8 @@ dependencies = [
"http 1.1.0",
"hyper 1.5.1",
"hyper-util",
- "rustls 0.23.17",
- "rustls-native-certs 0.8.0",
+ "rustls 0.23.19",
+ "rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
@@ -2358,7 +2387,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
]
[[package]]
@@ -2390,9 +2419,9 @@ dependencies = [
[[package]]
name = "itoa"
-version = "1.0.13"
+version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2"
+checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jobserver"
@@ -2405,9 +2434,9 @@ dependencies = [
[[package]]
name = "js-sys"
-version = "0.3.72"
+version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9"
+checksum = "fb15147158e79fd8b8afd0252522769c4f48725460b37338544d8379d94fc8f9"
dependencies = [
"wasm-bindgen",
]
@@ -2484,9 +2513,9 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.164"
+version = "0.2.167"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f"
+checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc"
[[package]]
name = "libflate"
@@ -2546,9 +2575,9 @@ checksum =
"78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "litemap"
-version = "0.7.3"
+version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
+checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
[[package]]
name = "lock_api"
@@ -2628,11 +2657,10 @@ dependencies = [
[[package]]
name = "mio"
-version = "1.0.2"
+version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
+checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
- "hermit-abi",
"libc",
"wasi",
"windows-sys 0.52.0",
@@ -2862,7 +2890,7 @@ dependencies = [
"flate2",
"futures",
"half",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
"lz4_flex",
"num",
"num-bigint",
@@ -3020,9 +3048,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.89"
+version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
+checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
dependencies = [
"unicode-ident",
]
@@ -3063,7 +3091,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash",
- "rustls 0.23.17",
+ "rustls 0.23.19",
"socket2",
"thiserror 2.0.3",
"tokio",
@@ -3081,7 +3109,7 @@ dependencies = [
"rand",
"ring",
"rustc-hash",
- "rustls 0.23.17",
+ "rustls 0.23.19",
"rustls-pki-types",
"slab",
"thiserror 2.0.3",
@@ -3259,8 +3287,8 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
- "rustls 0.23.17",
- "rustls-native-certs 0.8.0",
+ "rustls 0.23.19",
+ "rustls-native-certs 0.8.1",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"serde",
@@ -3378,9 +3406,9 @@ dependencies = [
[[package]]
name = "rustls"
-version = "0.23.17"
+version = "0.23.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e"
+checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1"
dependencies = [
"once_cell",
"ring",
@@ -3399,20 +3427,19 @@ dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.4",
"schannel",
- "security-framework",
+ "security-framework 2.11.1",
]
[[package]]
name = "rustls-native-certs"
-version = "0.8.0"
+version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a"
+checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
dependencies = [
"openssl-probe",
- "rustls-pemfile 2.2.0",
"rustls-pki-types",
"schannel",
- "security-framework",
+ "security-framework 3.0.1",
]
[[package]]
@@ -3538,7 +3565,20 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags 2.6.0",
- "core-foundation",
+ "core-foundation 0.9.4",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework"
+version = "3.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8"
+dependencies = [
+ "bitflags 2.6.0",
+ "core-foundation 0.10.0",
"core-foundation-sys",
"libc",
"security-framework-sys",
@@ -3686,9 +3726,9 @@ checksum =
"1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]]
name = "socket2"
-version = "0.5.7"
+version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
+checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
dependencies = [
"libc",
"windows-sys 0.52.0",
@@ -3801,9 +3841,9 @@ checksum =
"13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
-version = "2.0.87"
+version = "2.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
+checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31"
dependencies = [
"proc-macro2",
"quote",
@@ -4009,7 +4049,7 @@ version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
- "rustls 0.23.17",
+ "rustls 0.23.19",
"rustls-pki-types",
"tokio",
]
@@ -4052,9 +4092,9 @@ checksum =
"8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
-version = "0.1.40"
+version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
+checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"pin-project-lite",
"tracing-attributes",
@@ -4063,9 +4103,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
-version = "0.1.27"
+version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
+checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
dependencies = [
"proc-macro2",
"quote",
@@ -4074,9 +4114,9 @@ dependencies = [
[[package]]
name = "tracing-core"
-version = "0.1.32"
+version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
+checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
]
@@ -4155,9 +4195,9 @@ checksum =
"8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
-version = "2.5.3"
+version = "2.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada"
+checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60"
dependencies = [
"form_urlencoded",
"idna",
@@ -4246,9 +4286,9 @@ checksum =
"9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
-version = "0.2.95"
+version = "0.2.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
+checksum = "21d3b25c3ea1126a2ad5f4f9068483c2af1e64168f847abe863a526b8dbfe00b"
dependencies = [
"cfg-if",
"once_cell",
@@ -4257,9 +4297,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
-version = "0.2.95"
+version = "0.2.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
+checksum = "52857d4c32e496dc6537646b5b117081e71fd2ff06de792e3577a150627db283"
dependencies = [
"bumpalo",
"log",
@@ -4272,21 +4312,22 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
-version = "0.4.45"
+version = "0.4.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b"
+checksum = "951fe82312ed48443ac78b66fa43eded9999f738f6022e67aead7b708659e49a"
dependencies = [
"cfg-if",
"js-sys",
+ "once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.95"
+version = "0.2.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
+checksum = "920b0ffe069571ebbfc9ddc0b36ba305ef65577c94b06262ed793716a1afd981"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -4294,9 +4335,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.95"
+version = "0.2.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
+checksum = "bf59002391099644be3524e23b781fa43d2be0c5aa0719a18c0731b9d195cab6"
dependencies = [
"proc-macro2",
"quote",
@@ -4307,9 +4348,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.95"
+version = "0.2.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
+checksum = "e5047c5392700766601942795a436d7d2599af60dcc3cc1248c9120bfb0827b0"
[[package]]
name = "wasm-streams"
@@ -4326,9 +4367,9 @@ dependencies = [
[[package]]
name = "web-sys"
-version = "0.3.72"
+version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112"
+checksum = "476364ff87d0ae6bfb661053a9104ab312542658c3d8f963b7ace80b6f9b26b9"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -4578,9 +4619,9 @@ dependencies = [
[[package]]
name = "yoke"
-version = "0.7.4"
+version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
+checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40"
dependencies = [
"serde",
"stable_deref_trait",
@@ -4590,9 +4631,9 @@ dependencies = [
[[package]]
name = "yoke-derive"
-version = "0.7.4"
+version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
+checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
dependencies = [
"proc-macro2",
"quote",
@@ -4623,18 +4664,18 @@ dependencies = [
[[package]]
name = "zerofrom"
-version = "0.1.4"
+version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
+checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
-version = "0.1.4"
+version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
+checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
dependencies = [
"proc-macro2",
"quote",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 784d47220c..743ec1b4a7 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version =
"43.0.0", features = [
"unicode_expressions",
"compression",
] }
+datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
dirs = "5.0.1"
env_logger = "0.11"
futures = "0.3"
diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index c622463de0..d7ca48d638 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -24,13 +24,13 @@ use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
-use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
+use datafusion_catalog::TableFunctionImpl;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 05850e7b3a..d8aaad801e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -60,6 +60,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
+datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
diff --git a/datafusion-examples/examples/simple_udtf.rs
b/datafusion-examples/examples/simple_udtf.rs
index 6faa397ef6..f32560ede6 100644
--- a/datafusion-examples/examples/simple_udtf.rs
+++ b/datafusion-examples/examples/simple_udtf.rs
@@ -21,13 +21,13 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::Session;
-use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
+use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{Expr, TableType};
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index d771930de2..b6752191d9 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -25,9 +25,11 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
+use datafusion_expr::Expr;
+
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
- CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown,
TableType,
+ CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Debug + Sync + Send {
+ /// Create a table provider
+ fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
+}
+
+/// A table that uses a function to generate data
+#[derive(Debug)]
+pub struct TableFunction {
+ /// Name of the table function
+ name: String,
+ /// Function implementation
+ fun: Arc<dyn TableFunctionImpl>,
+}
+
+impl TableFunction {
+ /// Create a new table function
+ pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
+ Self { name, fun }
+ }
+
+ /// Get the name of the table function
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ /// Get the implementation of the table function
+ pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
+ &self.fun
+ }
+
+ /// Get the function implementation and generate a table
+ pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn
TableProvider>> {
+ self.fun.call(args)
+ }
+}
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 268e0fb17f..6c5a31e362 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -103,6 +103,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
+datafusion-functions-table = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
diff --git a/datafusion/core/src/datasource/function.rs
b/datafusion/core/src/datasource/function.rs
deleted file mode 100644
index 37ce59f820..0000000000
--- a/datafusion/core/src/datasource/function.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A table that uses a function to generate data
-
-use super::TableProvider;
-
-use datafusion_common::Result;
-use datafusion_expr::Expr;
-
-use std::fmt::Debug;
-use std::sync::Arc;
-
-/// A trait for table function implementations
-pub trait TableFunctionImpl: Debug + Sync + Send {
- /// Create a table provider
- fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
-}
-
-/// A table that uses a function to generate data
-#[derive(Debug)]
-pub struct TableFunction {
- /// Name of the table function
- name: String,
- /// Function implementation
- fun: Arc<dyn TableFunctionImpl>,
-}
-
-impl TableFunction {
- /// Create a new table function
- pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
- Self { name, fun }
- }
-
- /// Get the name of the table function
- pub fn name(&self) -> &str {
- &self.name
- }
-
- /// Get the implementation of the table function
- pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
- &self.fun
- }
-
- /// Get the function implementation and generate a table
- pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn
TableProvider>> {
- self.fun.call(args)
- }
-}
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index ad369b75e1..7d3fe9ddd7 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -25,7 +25,6 @@ pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
pub mod file_format;
-pub mod function;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 8ec8349164..4cc3200df1 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -30,9 +30,8 @@ use crate::{
catalog_common::memory::MemorySchemaProvider,
catalog_common::MemoryCatalogProvider,
dataframe::DataFrame,
- datasource::{
- function::{TableFunction, TableFunctionImpl},
- listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
+ datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
},
datasource::{provider_as_source, MemTable, ViewTable},
error::{DataFusionError, Result},
@@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory;
use crate::execution::session_state::SessionStateBuilder;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
-use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
+use datafusion_catalog::{
+ DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl,
UrlTableFactory,
+};
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
diff --git a/datafusion/core/src/execution/session_state.rs
b/datafusion/core/src/execution/session_state.rs
index e2be3cf374..4ccad5ffd3 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
-use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory,
QueryPlanner};
use crate::execution::SessionStateDefaults;
@@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner,
PhysicalPlanner};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
-use datafusion_catalog::Session;
+use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
@@ -1074,6 +1073,7 @@ impl SessionStateBuilder {
.with_scalar_functions(SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(SessionStateDefaults::default_aggregate_functions())
.with_window_functions(SessionStateDefaults::default_window_functions())
+
.with_table_function_list(SessionStateDefaults::default_table_functions())
}
/// Set the session id.
@@ -1188,6 +1188,19 @@ impl SessionStateBuilder {
self
}
+ /// Set the list of [`TableFunction`]s
+ pub fn with_table_function_list(
+ mut self,
+ table_functions: Vec<Arc<TableFunction>>,
+ ) -> Self {
+ let functions = table_functions
+ .into_iter()
+ .map(|f| (f.name().to_string(), f))
+ .collect();
+ self.table_functions = Some(functions);
+ self
+ }
+
/// Set the map of [`ScalarUDF`]s
pub fn with_scalar_functions(
mut self,
diff --git a/datafusion/core/src/execution/session_state_defaults.rs
b/datafusion/core/src/execution/session_state_defaults.rs
index 7ba332c520..106082bc7b 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
-use crate::{functions, functions_aggregate, functions_window};
+use crate::{functions, functions_aggregate, functions_table, functions_window};
+use datafusion_catalog::TableFunction;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -119,6 +120,11 @@ impl SessionStateDefaults {
functions_window::all_default_window_functions()
}
+ /// returns the list of default [`TableFunction`]s
+ pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
+ functions_table::all_default_table_functions()
+ }
+
/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index d049e774d7..a1b18b8bfe 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -773,6 +773,11 @@ pub mod functions_window {
pub use datafusion_functions_window::*;
}
+/// re-export of [`datafusion_functions_table`] crate
+pub mod functions_table {
+ pub use datafusion_functions_table::*;
+}
+
/// re-export of variable provider for `@name` and `@@name` style runtime
values.
pub mod variable {
pub use datafusion_expr::var_provider::{VarProvider, VarType};
diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs
b/datafusion/core/tests/user_defined/user_defined_table_functions.rs
index 0cc156866d..39f10ef11a 100644
--- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs
@@ -21,7 +21,6 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::TaskContext;
@@ -29,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
use std::fs::File;
diff --git a/datafusion/functions-table/Cargo.toml
b/datafusion/functions-table/Cargo.toml
new file mode 100644
index 0000000000..f667bdde58
--- /dev/null
+++ b/datafusion/functions-table/Cargo.toml
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-functions-table"
+description = "Traits and types for logical plans and expressions for
DataFusion query engine"
+keywords = ["datafusion", "logical", "plan", "expressions"]
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+rust-version = { workspace = true }
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_functions_table"
+path = "src/lib.rs"
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+ahash = { workspace = true }
+arrow = { workspace = true }
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-functions-aggregate-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+half = { workspace = true }
+indexmap = { workspace = true }
+log = { workspace = true }
+parking_lot = { workspace = true }
+paste = "1.0.14"
+
+[dev-dependencies]
+arrow = { workspace = true, features = ["test_utils"] }
+criterion = "0.5"
+rand = { workspace = true }
diff --git a/datafusion/functions-table/src/generate_series.rs
b/datafusion/functions-table/src/generate_series.rs
new file mode 100644
index 0000000000..ced43ea8f0
--- /dev/null
+++ b/datafusion/functions-table/src/generate_series.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Int64Array;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
+use datafusion_catalog::TableProvider;
+use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::ExecutionPlan;
+use parking_lot::RwLock;
+use std::fmt;
+use std::sync::Arc;
+
+/// Table that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesTable {
+ schema: SchemaRef,
+ // None if input is Null
+ start: Option<i64>,
+ // None if input is Null
+ end: Option<i64>,
+}
+
+/// Table state that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesState {
+ schema: SchemaRef,
+ start: i64, // Kept for display
+ end: i64,
+ batch_size: usize,
+
+ /// Tracks current position when generating table
+ current: i64,
+}
+
+/// Detail to display for 'Explain' plan
+impl fmt::Display for GenerateSeriesState {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "generate_series: start={}, end={}, batch_size={}",
+ self.start, self.end, self.batch_size
+ )
+ }
+}
+
+impl LazyBatchGenerator for GenerateSeriesState {
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
+ // Check if we've reached the end
+ if self.current > self.end {
+ return Ok(None);
+ }
+
+ // Construct batch
+ let batch_end = (self.current + self.batch_size as i64 -
1).min(self.end);
+ let array = Int64Array::from_iter_values(self.current..=batch_end);
+ let batch = RecordBatch::try_new(self.schema.clone(),
vec![Arc::new(array)])?;
+
+ // Update current position for next batch
+ self.current = batch_end + 1;
+
+ Ok(Some(batch))
+ }
+}
+
+#[async_trait]
+impl TableProvider for GenerateSeriesTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let batch_size = state.config_options().execution.batch_size;
+ match (self.start, self.end) {
+ (Some(start), Some(end)) => {
+ if start > end {
+ return plan_err!(
+ "End value must be greater than or equal to start
value"
+ );
+ }
+
+ Ok(Arc::new(LazyMemoryExec::try_new(
+ self.schema.clone(),
+ vec![Arc::new(RwLock::new(GenerateSeriesState {
+ schema: self.schema.clone(),
+ start,
+ end,
+ current: start,
+ batch_size,
+ }))],
+ )?))
+ }
+ _ => {
+ // Either start or end is None, return a generator that
outputs 0 rows
+ Ok(Arc::new(LazyMemoryExec::try_new(
+ self.schema.clone(),
+ vec![Arc::new(RwLock::new(GenerateSeriesState {
+ schema: self.schema.clone(),
+ start: 0,
+ end: 0,
+ current: 1,
+ batch_size,
+ }))],
+ )?))
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct GenerateSeriesFunc {}
+
+impl TableFunctionImpl for GenerateSeriesFunc {
+ // Check input `exprs` type and number. Input validity check (e.g. start
<= end)
+ // will be performed in `TableProvider::scan`
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ // TODO: support 1 or 3 arguments following DuckDB:
+ // <https://duckdb.org/docs/sql/functions/list#generate_series>
+ if exprs.len() == 3 || exprs.len() == 1 {
+ return not_impl_err!("generate_series does not support 1 or 3
arguments");
+ }
+
+ if exprs.len() != 2 {
+ return plan_err!("generate_series expects 2 arguments");
+ }
+
+ let start = match &exprs[0] {
+ Expr::Literal(ScalarValue::Null) => None,
+ Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n),
+ _ => return plan_err!("First argument must be an integer literal"),
+ };
+
+ let end = match &exprs[1] {
+ Expr::Literal(ScalarValue::Null) => None,
+ Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n),
+ _ => return plan_err!("Second argument must be an integer
literal"),
+ };
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Int64,
+ false,
+ )]));
+
+ Ok(Arc::new(GenerateSeriesTable { schema, start, end }))
+ }
+}
diff --git a/datafusion/functions-table/src/lib.rs
b/datafusion/functions-table/src/lib.rs
new file mode 100644
index 0000000000..9ea4c0c899
--- /dev/null
+++ b/datafusion/functions-table/src/lib.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod generate_series;
+
+use datafusion_catalog::TableFunction;
+use std::sync::Arc;
+
+/// Returns all default table functions
+pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
+ vec![generate_series()]
+}
+
+/// Creates a singleton instance of a table function
+/// - `$module`: A struct implementing `TableFunctionImpl` to create the
function from
+/// - `$name`: The name to give to the created function
+///
+/// This is used to ensure creating the list of `TableFunction` only happens
once.
+#[macro_export]
+macro_rules! create_udtf_function {
+ ($module:path, $name:expr) => {
+ paste::paste! {
+ static INSTANCE: std::sync::OnceLock<Arc<TableFunction>> =
std::sync::OnceLock::new();
+
+ pub fn [<$name:lower>]() -> Arc<TableFunction> {
+ INSTANCE.get_or_init(|| {
+ Arc::new(TableFunction::new(
+ $name.to_string(),
+ Arc::new($module {}),
+ ))
+ }).clone()
+ }
+ }
+ };
+}
+
+create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series");
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index 272dcdc95b..bf6294f5a5 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading in-memory batches of data
+use parking_lot::RwLock;
use std::any::Any;
use std::fmt;
use std::sync::Arc;
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream {
}
}
+pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+ /// Generate the next batch, return `None` when no more batches are
available
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+}
+
+/// Execution plan for lazy in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant
memory.
+pub struct LazyMemoryExec {
+ /// Schema representing the data
+ schema: SchemaRef,
+ /// Functions to generate batches for each partition
+ batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
+ /// Plan properties cache storing equivalence properties, partitioning,
and execution mode
+ cache: PlanProperties,
+}
+
+impl LazyMemoryExec {
+ /// Create a new lazy memory execution plan
+ pub fn try_new(
+ schema: SchemaRef,
+ generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
+ ) -> Result<Self> {
+ let cache = PlanProperties::new(
+ EquivalenceProperties::new(Arc::clone(&schema)),
+ Partitioning::RoundRobinBatch(generators.len()),
+ ExecutionMode::Bounded,
+ );
+ Ok(Self {
+ schema,
+ batch_generators: generators,
+ cache,
+ })
+ }
+}
+
+impl fmt::Debug for LazyMemoryExec {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("LazyMemoryExec")
+ .field("schema", &self.schema)
+ .field("batch_generators", &self.batch_generators)
+ .finish()
+ }
+}
+
+impl DisplayAs for LazyMemoryExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "LazyMemoryExec: partitions={}, batch_generators=[{}]",
+ self.batch_generators.len(),
+ self.batch_generators
+ .iter()
+ .map(|g| g.read().to_string())
+ .collect::<Vec<_>>()
+ .join(", ")
+ )
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for LazyMemoryExec {
+ fn name(&self) -> &'static str {
+ "LazyMemoryExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.cache
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ internal_err!("Children cannot be replaced in LazyMemoryExec")
+ }
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ if partition >= self.batch_generators.len() {
+ return internal_err!(
+ "Invalid partition {} for LazyMemoryExec with {} partitions",
+ partition,
+ self.batch_generators.len()
+ );
+ }
+
+ Ok(Box::pin(LazyMemoryStream {
+ schema: Arc::clone(&self.schema),
+ generator: Arc::clone(&self.batch_generators[partition]),
+ }))
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(Statistics::new_unknown(&self.schema))
+ }
+}
+
+/// Stream that generates record batches on demand
+pub struct LazyMemoryStream {
+ schema: SchemaRef,
+ /// Generator to produce batches
+ ///
+ /// Note: Idiomatically, DataFusion uses plan-time parallelism - each
stream
+ /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or
+ /// construct multiple `LazyMemoryStream`s during planning to enable
+ /// parallel execution.
+ /// Sharing generators between streams should be used with caution.
+ generator: Arc<RwLock<dyn LazyBatchGenerator>>,
+}
+
+impl Stream for LazyMemoryStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let batch = self.generator.write().generate_next_batch();
+
+ match batch {
+ Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
+ Ok(None) => Poll::Ready(None),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ }
+ }
+}
+
+impl RecordBatchStream for LazyMemoryStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
#[cfg(test)]
-mod tests {
+mod memory_exec_tests {
use std::sync::Arc;
use crate::memory::MemoryExec;
@@ -416,3 +574,123 @@ mod tests {
Ok(())
}
}
+
+#[cfg(test)]
+mod lazy_memory_tests {
+ use super::*;
+ use arrow::array::Int64Array;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use futures::StreamExt;
+
+ #[derive(Debug, Clone)]
+ struct TestGenerator {
+ counter: i64,
+ max_batches: i64,
+ batch_size: usize,
+ schema: SchemaRef,
+ }
+
+ impl fmt::Display for TestGenerator {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "TestGenerator: counter={}, max_batches={}, batch_size={}",
+ self.counter, self.max_batches, self.batch_size
+ )
+ }
+ }
+
+ impl LazyBatchGenerator for TestGenerator {
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
+ if self.counter >= self.max_batches {
+ return Ok(None);
+ }
+
+ let array = Int64Array::from_iter_values(
+ (self.counter * self.batch_size as i64)
+ ..(self.counter * self.batch_size as i64 + self.batch_size
as i64),
+ );
+ self.counter += 1;
+ Ok(Some(RecordBatch::try_new(
+ Arc::clone(&self.schema),
+ vec![Arc::new(array)],
+ )?))
+ }
+ }
+
+ #[tokio::test]
+ async fn test_lazy_memory_exec() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int64, false)]));
+ let generator = TestGenerator {
+ counter: 0,
+ max_batches: 3,
+ batch_size: 2,
+ schema: Arc::clone(&schema),
+ };
+
+ let exec =
+ LazyMemoryExec::try_new(schema,
vec![Arc::new(RwLock::new(generator))])?;
+
+ // Test schema
+ assert_eq!(exec.schema().fields().len(), 1);
+ assert_eq!(exec.schema().field(0).name(), "a");
+
+ // Test execution
+ let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
+ let batches: Vec<_> = stream.collect::<Vec<_>>().await;
+
+ assert_eq!(batches.len(), 3);
+
+ // Verify batch contents
+ let batch0 = batches[0].as_ref().unwrap();
+ let array0 = batch0
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(array0.values(), &[0, 1]);
+
+ let batch1 = batches[1].as_ref().unwrap();
+ let array1 = batch1
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(array1.values(), &[2, 3]);
+
+ let batch2 = batches[2].as_ref().unwrap();
+ let array2 = batch2
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(array2.values(), &[4, 5]);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int64, false)]));
+ let generator = TestGenerator {
+ counter: 0,
+ max_batches: 1,
+ batch_size: 1,
+ schema: Arc::clone(&schema),
+ };
+
+ let exec =
+ LazyMemoryExec::try_new(schema,
vec![Arc::new(RwLock::new(generator))])?;
+
+ // Test invalid partition
+ let result = exec.execute(1, Arc::new(TaskContext::default()));
+
+ // partition is 0-indexed, so there only should be partition 0
+ assert!(matches!(
+ result,
+ Err(e) if e.to_string().contains("Invalid partition 1 for
LazyMemoryExec with 1 partitions")
+ ));
+
+ Ok(())
+ }
+}
diff --git a/datafusion/sqllogictest/test_files/table_functions.slt
b/datafusion/sqllogictest/test_files/table_functions.slt
new file mode 100644
index 0000000000..12402e0d70
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/table_functions.slt
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test generate_series table function
+
+query I rowsort
+SELECT * FROM generate_series(1, 5)
+----
+1
+2
+3
+4
+5
+
+query I rowsort
+SELECT * FROM generate_series(1, 1)
+----
+1
+
+query I rowsort
+SELECT * FROM generate_series(3, 6)
+----
+3
+4
+5
+6
+
+query I rowsort
+SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
+----
+15
+
+# Test generate_series with WHERE clause
+query I rowsort
+SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0
+----
+10
+2
+4
+6
+8
+
+# Test generate_series with ORDER BY
+query I
+SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
+----
+5
+4
+3
+2
+1
+
+# Test generate_series with LIMIT
+query I rowsort
+SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
+----
+1
+2
+3
+4
+5
+
+# Test generate_series in subquery
+query I rowsort
+SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
+----
+11
+12
+13
+
+# Test generate_series with JOIN
+query II rowsort
+SELECT a.v1, b.v1
+FROM generate_series(1, 3) a(v1)
+JOIN generate_series(2, 4) b(v1)
+ON a.v1 = b.v1 - 1
+----
+1 2
+2 3
+3 4
+
+query I
+SELECT * FROM generate_series(NULL, 5)
+----
+
+query I
+SELECT * FROM generate_series(1, NULL)
+----
+
+query I
+SELECT * FROM generate_series(NULL, NULL)
+----
+
+query TT
+EXPLAIN SELECT * FROM generate_series(1, 5)
+----
+logical_plan TableScan: tmp_table projection=[value]
+physical_plan LazyMemoryExec: partitions=1, batch_generators=[generate_series:
start=1, end=5, batch_size=8192]
+
+#
+# Test generate_series with invalid arguments
+#
+
+query error DataFusion error: Error during planning: End value must be greater
than or equal to start value
+SELECT * FROM generate_series(5, 1)
+
+statement error DataFusion error: This feature is not implemented:
generate_series does not support 1 or 3 arguments
+SELECT * FROM generate_series(1, 5, NULL)
+
+statement error DataFusion error: This feature is not implemented:
generate_series does not support 1 or 3 arguments
+SELECT * FROM generate_series(1)
+
+statement error DataFusion error: Error during planning: generate_series
expects 2 arguments
+SELECT * FROM generate_series(1, 2, 3, 4)
+
+statement error DataFusion error: Error during planning: Second argument must
be an integer literal
+SELECT * FROM generate_series(1, '2')
+
+statement error DataFusion error: Error during planning: First argument must
be an integer literal
+SELECT * FROM generate_series('foo', 'bar')
+
+# UDF and UDTF `generate_series` can be used simultaneously
+query ? rowsort
+SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end)
+----
+[1, 2, 3, 4, 5]
+[1, 2, 3, 4]
+[1, 2, 3]
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]