This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 86740bfd3d Add generate_series() udtf (and introduce 'lazy' 
`MemoryExec`) (#13540)
86740bfd3d is described below

commit 86740bfd3d9831d6b7c1d0e1bf4a21d91598a0ac
Author: Yongting You <[email protected]>
AuthorDate: Tue Dec 3 02:43:53 2024 +0800

    Add generate_series() udtf (and introduce 'lazy' `MemoryExec`) (#13540)
    
    * Add generate_series() udtf
    
    * liscence
    
    * fix examples
    
    * clippy
    
    * comments
    
    * singleton udtf init
    
    * StreamingMemoryExec -> LazyMemoryExec
    
    * use RwLock
    
    * test udf+udtf generate_series() in the same sql
    
    * CI
    
    * CI
    
    * small fixes
---
 Cargo.toml                                         |   2 +
 datafusion-cli/Cargo.lock                          | 213 +++++++++-------
 datafusion-cli/Cargo.toml                          |   1 +
 datafusion-cli/src/functions.rs                    |   2 +-
 datafusion-examples/Cargo.toml                     |   1 +
 datafusion-examples/examples/simple_udtf.rs        |   2 +-
 datafusion/catalog/src/table.rs                    |  41 ++-
 datafusion/core/Cargo.toml                         |   1 +
 datafusion/core/src/datasource/function.rs         |  63 -----
 datafusion/core/src/datasource/mod.rs              |   1 -
 datafusion/core/src/execution/context/mod.rs       |   9 +-
 datafusion/core/src/execution/session_state.rs     |  17 +-
 .../core/src/execution/session_state_defaults.rs   |   8 +-
 datafusion/core/src/lib.rs                         |   5 +
 .../user_defined/user_defined_table_functions.rs   |   2 +-
 datafusion/functions-table/Cargo.toml              |  62 +++++
 datafusion/functions-table/src/generate_series.rs  | 180 +++++++++++++
 datafusion/functions-table/src/lib.rs              |  51 ++++
 datafusion/physical-plan/src/memory.rs             | 280 ++++++++++++++++++++-
 .../sqllogictest/test_files/table_functions.slt    | 142 +++++++++++
 20 files changed, 921 insertions(+), 162 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 76bc50d59a..1ca6cdfdb1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ members = [
     "datafusion/functions",
     "datafusion/functions-aggregate",
     "datafusion/functions-aggregate-common",
+    "datafusion/functions-table",
     "datafusion/functions-nested",
     "datafusion/functions-window",
     "datafusion/functions-window-common",
@@ -110,6 +111,7 @@ datafusion-functions = { path = "datafusion/functions", 
version = "43.0.0" }
 datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", 
version = "43.0.0" }
 datafusion-functions-aggregate-common = { path = 
"datafusion/functions-aggregate-common", version = "43.0.0" }
 datafusion-functions-nested = { path = "datafusion/functions-nested", version 
= "43.0.0" }
+datafusion-functions-table = { path = "datafusion/functions-table", version = 
"43.0.0" }
 datafusion-functions-window = { path = "datafusion/functions-window", version 
= "43.0.0" }
 datafusion-functions-window-common = { path = 
"datafusion/functions-window-common", version = "43.0.0" }
 datafusion-macros = { path = "datafusion/macros", version = "43.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 04b0b0d22c..f06a8b210b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -220,7 +220,7 @@ dependencies = [
  "chrono",
  "chrono-tz",
  "half",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
  "num",
 ]
 
@@ -406,9 +406,9 @@ dependencies = [
 
 [[package]]
 name = "async-compression"
-version = "0.4.17"
+version = "0.4.18"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857"
+checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
 dependencies = [
  "bzip2",
  "flate2",
@@ -814,9 +814,9 @@ dependencies = [
 
 [[package]]
 name = "blake3"
-version = "1.5.4"
+version = "1.5.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7"
+checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e"
 dependencies = [
  "arrayref",
  "arrayvec",
@@ -880,9 +880,9 @@ checksum = 
"1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
 
 [[package]]
 name = "bytes"
-version = "1.8.0"
+version = "1.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
+checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
 
 [[package]]
 name = "bytes-utils"
@@ -917,9 +917,9 @@ dependencies = [
 
 [[package]]
 name = "cc"
-version = "1.2.1"
+version = "1.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47"
+checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc"
 dependencies = [
  "jobserver",
  "libc",
@@ -1080,6 +1080,16 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "core-foundation"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "core-foundation-sys"
 version = "0.8.7"
@@ -1097,9 +1107,9 @@ dependencies = [
 
 [[package]]
 name = "cpufeatures"
-version = "0.2.15"
+version = "0.2.16"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6"
+checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3"
 dependencies = [
  "libc",
 ]
@@ -1209,6 +1219,7 @@ dependencies = [
  "datafusion-functions",
  "datafusion-functions-aggregate",
  "datafusion-functions-nested",
+ "datafusion-functions-table",
  "datafusion-functions-window",
  "datafusion-optimizer",
  "datafusion-physical-expr",
@@ -1265,6 +1276,7 @@ dependencies = [
  "clap",
  "ctor",
  "datafusion",
+ "datafusion-catalog",
  "dirs",
  "env_logger",
  "futures",
@@ -1446,6 +1458,29 @@ dependencies = [
  "rand",
 ]
 
+[[package]]
+name = "datafusion-functions-table"
+version = "43.0.0"
+dependencies = [
+ "ahash",
+ "arrow",
+ "arrow-schema",
+ "async-trait",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions-aggregate-common",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-plan",
+ "half",
+ "indexmap",
+ "log",
+ "parking_lot",
+ "paste",
+]
+
 [[package]]
 name = "datafusion-functions-window"
 version = "43.0.0"
@@ -1700,12 +1735,12 @@ checksum = 
"5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
 
 [[package]]
 name = "errno"
-version = "0.3.9"
+version = "0.3.10"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
+checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
 dependencies = [
  "libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -1972,9 +2007,9 @@ dependencies = [
 
 [[package]]
 name = "hashbrown"
-version = "0.15.1"
+version = "0.15.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
+checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
 
 [[package]]
 name = "heck"
@@ -1988,12 +2023,6 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
 
-[[package]]
-name = "hermit-abi"
-version = "0.3.9"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
-
 [[package]]
 name = "hex"
 version = "0.4.3"
@@ -2162,8 +2191,8 @@ dependencies = [
  "http 1.1.0",
  "hyper 1.5.1",
  "hyper-util",
- "rustls 0.23.17",
- "rustls-native-certs 0.8.0",
+ "rustls 0.23.19",
+ "rustls-native-certs 0.8.1",
  "rustls-pki-types",
  "tokio",
  "tokio-rustls 0.26.0",
@@ -2358,7 +2387,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
 dependencies = [
  "equivalent",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
 ]
 
 [[package]]
@@ -2390,9 +2419,9 @@ dependencies = [
 
 [[package]]
 name = "itoa"
-version = "1.0.13"
+version = "1.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2"
+checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
 
 [[package]]
 name = "jobserver"
@@ -2405,9 +2434,9 @@ dependencies = [
 
 [[package]]
 name = "js-sys"
-version = "0.3.72"
+version = "0.3.73"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9"
+checksum = "fb15147158e79fd8b8afd0252522769c4f48725460b37338544d8379d94fc8f9"
 dependencies = [
  "wasm-bindgen",
 ]
@@ -2484,9 +2513,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.164"
+version = "0.2.167"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f"
+checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc"
 
 [[package]]
 name = "libflate"
@@ -2546,9 +2575,9 @@ checksum = 
"78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
 
 [[package]]
 name = "litemap"
-version = "0.7.3"
+version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
+checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
 
 [[package]]
 name = "lock_api"
@@ -2628,11 +2657,10 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "1.0.2"
+version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
+checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
 dependencies = [
- "hermit-abi",
  "libc",
  "wasi",
  "windows-sys 0.52.0",
@@ -2862,7 +2890,7 @@ dependencies = [
  "flate2",
  "futures",
  "half",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
  "lz4_flex",
  "num",
  "num-bigint",
@@ -3020,9 +3048,9 @@ dependencies = [
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.89"
+version = "1.0.92"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
+checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
 dependencies = [
  "unicode-ident",
 ]
@@ -3063,7 +3091,7 @@ dependencies = [
  "quinn-proto",
  "quinn-udp",
  "rustc-hash",
- "rustls 0.23.17",
+ "rustls 0.23.19",
  "socket2",
  "thiserror 2.0.3",
  "tokio",
@@ -3081,7 +3109,7 @@ dependencies = [
  "rand",
  "ring",
  "rustc-hash",
- "rustls 0.23.17",
+ "rustls 0.23.19",
  "rustls-pki-types",
  "slab",
  "thiserror 2.0.3",
@@ -3259,8 +3287,8 @@ dependencies = [
  "percent-encoding",
  "pin-project-lite",
  "quinn",
- "rustls 0.23.17",
- "rustls-native-certs 0.8.0",
+ "rustls 0.23.19",
+ "rustls-native-certs 0.8.1",
  "rustls-pemfile 2.2.0",
  "rustls-pki-types",
  "serde",
@@ -3378,9 +3406,9 @@ dependencies = [
 
 [[package]]
 name = "rustls"
-version = "0.23.17"
+version = "0.23.19"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e"
+checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1"
 dependencies = [
  "once_cell",
  "ring",
@@ -3399,20 +3427,19 @@ dependencies = [
  "openssl-probe",
  "rustls-pemfile 1.0.4",
  "schannel",
- "security-framework",
+ "security-framework 2.11.1",
 ]
 
 [[package]]
 name = "rustls-native-certs"
-version = "0.8.0"
+version = "0.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a"
+checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
 dependencies = [
  "openssl-probe",
- "rustls-pemfile 2.2.0",
  "rustls-pki-types",
  "schannel",
- "security-framework",
+ "security-framework 3.0.1",
 ]
 
 [[package]]
@@ -3538,7 +3565,20 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
 dependencies = [
  "bitflags 2.6.0",
- "core-foundation",
+ "core-foundation 0.9.4",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework"
+version = "3.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8"
+dependencies = [
+ "bitflags 2.6.0",
+ "core-foundation 0.10.0",
  "core-foundation-sys",
  "libc",
  "security-framework-sys",
@@ -3686,9 +3726,9 @@ checksum = 
"1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
 [[package]]
 name = "socket2"
-version = "0.5.7"
+version = "0.5.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
+checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
 dependencies = [
  "libc",
  "windows-sys 0.52.0",
@@ -3801,9 +3841,9 @@ checksum = 
"13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
 
 [[package]]
 name = "syn"
-version = "2.0.87"
+version = "2.0.90"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
+checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -4009,7 +4049,7 @@ version = "0.26.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
 dependencies = [
- "rustls 0.23.17",
+ "rustls 0.23.19",
  "rustls-pki-types",
  "tokio",
 ]
@@ -4052,9 +4092,9 @@ checksum = 
"8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
 
 [[package]]
 name = "tracing"
-version = "0.1.40"
+version = "0.1.41"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
+checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
 dependencies = [
  "pin-project-lite",
  "tracing-attributes",
@@ -4063,9 +4103,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.27"
+version = "0.1.28"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
+checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -4074,9 +4114,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-core"
-version = "0.1.32"
+version = "0.1.33"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
+checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
 dependencies = [
  "once_cell",
 ]
@@ -4155,9 +4195,9 @@ checksum = 
"8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
 
 [[package]]
 name = "url"
-version = "2.5.3"
+version = "2.5.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada"
+checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60"
 dependencies = [
  "form_urlencoded",
  "idna",
@@ -4246,9 +4286,9 @@ checksum = 
"9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
 
 [[package]]
 name = "wasm-bindgen"
-version = "0.2.95"
+version = "0.2.96"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
+checksum = "21d3b25c3ea1126a2ad5f4f9068483c2af1e64168f847abe863a526b8dbfe00b"
 dependencies = [
  "cfg-if",
  "once_cell",
@@ -4257,9 +4297,9 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-backend"
-version = "0.2.95"
+version = "0.2.96"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
+checksum = "52857d4c32e496dc6537646b5b117081e71fd2ff06de792e3577a150627db283"
 dependencies = [
  "bumpalo",
  "log",
@@ -4272,21 +4312,22 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-futures"
-version = "0.4.45"
+version = "0.4.46"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b"
+checksum = "951fe82312ed48443ac78b66fa43eded9999f738f6022e67aead7b708659e49a"
 dependencies = [
  "cfg-if",
  "js-sys",
+ "once_cell",
  "wasm-bindgen",
  "web-sys",
 ]
 
 [[package]]
 name = "wasm-bindgen-macro"
-version = "0.2.95"
+version = "0.2.96"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
+checksum = "920b0ffe069571ebbfc9ddc0b36ba305ef65577c94b06262ed793716a1afd981"
 dependencies = [
  "quote",
  "wasm-bindgen-macro-support",
@@ -4294,9 +4335,9 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-macro-support"
-version = "0.2.95"
+version = "0.2.96"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
+checksum = "bf59002391099644be3524e23b781fa43d2be0c5aa0719a18c0731b9d195cab6"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -4307,9 +4348,9 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-shared"
-version = "0.2.95"
+version = "0.2.96"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
+checksum = "e5047c5392700766601942795a436d7d2599af60dcc3cc1248c9120bfb0827b0"
 
 [[package]]
 name = "wasm-streams"
@@ -4326,9 +4367,9 @@ dependencies = [
 
 [[package]]
 name = "web-sys"
-version = "0.3.72"
+version = "0.3.73"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112"
+checksum = "476364ff87d0ae6bfb661053a9104ab312542658c3d8f963b7ace80b6f9b26b9"
 dependencies = [
  "js-sys",
  "wasm-bindgen",
@@ -4578,9 +4619,9 @@ dependencies = [
 
 [[package]]
 name = "yoke"
-version = "0.7.4"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
+checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40"
 dependencies = [
  "serde",
  "stable_deref_trait",
@@ -4590,9 +4631,9 @@ dependencies = [
 
 [[package]]
 name = "yoke-derive"
-version = "0.7.4"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
+checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -4623,18 +4664,18 @@ dependencies = [
 
 [[package]]
 name = "zerofrom"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
+checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e"
 dependencies = [
  "zerofrom-derive",
 ]
 
 [[package]]
 name = "zerofrom-derive"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
+checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
 dependencies = [
  "proc-macro2",
  "quote",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 784d47220c..743ec1b4a7 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = 
"43.0.0", features = [
     "unicode_expressions",
     "compression",
 ] }
+datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
 dirs = "5.0.1"
 env_logger = "0.11"
 futures = "0.3"
diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index c622463de0..d7ca48d638 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -24,13 +24,13 @@ use async_trait::async_trait;
 
 use datafusion::catalog::Session;
 use datafusion::common::{plan_err, Column};
-use datafusion::datasource::function::TableFunctionImpl;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
 use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::scalar::ScalarValue;
+use datafusion_catalog::TableFunctionImpl;
 use parquet::basic::ConvertedType;
 use parquet::data_type::{ByteArray, FixedLenByteArray};
 use parquet::file::reader::FileReader;
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 05850e7b3a..d8aaad801e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -60,6 +60,7 @@ async-trait = { workspace = true }
 bytes = { workspace = true }
 dashmap = { workspace = true }
 datafusion = { workspace = true, default-features = true, features = ["avro"] }
+datafusion-catalog = { workspace = true }
 datafusion-common = { workspace = true, default-features = true }
 datafusion-expr = { workspace = true }
 datafusion-functions-window-common = { workspace = true }
diff --git a/datafusion-examples/examples/simple_udtf.rs 
b/datafusion-examples/examples/simple_udtf.rs
index 6faa397ef6..f32560ede6 100644
--- a/datafusion-examples/examples/simple_udtf.rs
+++ b/datafusion-examples/examples/simple_udtf.rs
@@ -21,13 +21,13 @@ use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::catalog::Session;
-use datafusion::datasource::function::TableFunctionImpl;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
 use datafusion::execution::context::ExecutionProps;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
+use datafusion_catalog::TableFunctionImpl;
 use datafusion_common::{plan_err, ScalarValue};
 use datafusion_expr::simplify::SimplifyContext;
 use datafusion_expr::{Expr, TableType};
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index d771930de2..b6752191d9 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -25,9 +25,11 @@ use arrow_schema::SchemaRef;
 use async_trait::async_trait;
 use datafusion_common::Result;
 use datafusion_common::{not_impl_err, Constraints, Statistics};
+use datafusion_expr::Expr;
+
 use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{
-    CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, 
TableType,
+    CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
 };
 use datafusion_physical_plan::ExecutionPlan;
 
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
         cmd: &CreateExternalTable,
     ) -> Result<Arc<dyn TableProvider>>;
 }
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Debug + Sync + Send {
+    /// Create a table provider
+    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
+}
+
+/// A table that uses a function to generate data
+#[derive(Debug)]
+pub struct TableFunction {
+    /// Name of the table function
+    name: String,
+    /// Function implementation
+    fun: Arc<dyn TableFunctionImpl>,
+}
+
+impl TableFunction {
+    /// Create a new table function
+    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
+        Self { name, fun }
+    }
+
+    /// Get the name of the table function
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Get the implementation of the table function
+    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
+        &self.fun
+    }
+
+    /// Get the function implementation and generate a table
+    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn 
TableProvider>> {
+        self.fun.call(args)
+    }
+}
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 268e0fb17f..6c5a31e362 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -103,6 +103,7 @@ datafusion-expr = { workspace = true }
 datafusion-functions = { workspace = true }
 datafusion-functions-aggregate = { workspace = true }
 datafusion-functions-nested = { workspace = true, optional = true }
+datafusion-functions-table = { workspace = true }
 datafusion-functions-window = { workspace = true }
 datafusion-optimizer = { workspace = true }
 datafusion-physical-expr = { workspace = true }
diff --git a/datafusion/core/src/datasource/function.rs 
b/datafusion/core/src/datasource/function.rs
deleted file mode 100644
index 37ce59f820..0000000000
--- a/datafusion/core/src/datasource/function.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A table that uses a function to generate data
-
-use super::TableProvider;
-
-use datafusion_common::Result;
-use datafusion_expr::Expr;
-
-use std::fmt::Debug;
-use std::sync::Arc;
-
-/// A trait for table function implementations
-pub trait TableFunctionImpl: Debug + Sync + Send {
-    /// Create a table provider
-    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
-}
-
-/// A table that uses a function to generate data
-#[derive(Debug)]
-pub struct TableFunction {
-    /// Name of the table function
-    name: String,
-    /// Function implementation
-    fun: Arc<dyn TableFunctionImpl>,
-}
-
-impl TableFunction {
-    /// Create a new table function
-    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
-        Self { name, fun }
-    }
-
-    /// Get the name of the table function
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    /// Get the implementation of the table function
-    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
-        &self.fun
-    }
-
-    /// Get the function implementation and generate a table
-    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn 
TableProvider>> {
-        self.fun.call(args)
-    }
-}
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index ad369b75e1..7d3fe9ddd7 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -25,7 +25,6 @@ pub mod default_table_source;
 pub mod dynamic_file;
 pub mod empty;
 pub mod file_format;
-pub mod function;
 pub mod listing;
 pub mod listing_table_factory;
 pub mod memory;
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 8ec8349164..4cc3200df1 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -30,9 +30,8 @@ use crate::{
     catalog_common::memory::MemorySchemaProvider,
     catalog_common::MemoryCatalogProvider,
     dataframe::DataFrame,
-    datasource::{
-        function::{TableFunction, TableFunctionImpl},
-        listing::{ListingOptions, ListingTable, ListingTableConfig, 
ListingTableUrl},
+    datasource::listing::{
+        ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
     },
     datasource::{provider_as_source, MemTable, ViewTable},
     error::{DataFusionError, Result},
@@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory;
 use crate::execution::session_state::SessionStateBuilder;
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
-use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
+use datafusion_catalog::{
+    DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, 
UrlTableFactory,
+};
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
 pub use datafusion_expr::execution_props::ExecutionProps;
diff --git a/datafusion/core/src/execution/session_state.rs 
b/datafusion/core/src/execution/session_state.rs
index e2be3cf374..4ccad5ffd3 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{
 use crate::catalog_common::MemoryCatalogProviderList;
 use crate::datasource::cte_worktable::CteWorkTable;
 use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
-use crate::datasource::function::{TableFunction, TableFunctionImpl};
 use crate::datasource::provider_as_source;
 use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, 
QueryPlanner};
 use crate::execution::SessionStateDefaults;
@@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, 
PhysicalPlanner};
 use arrow_schema::{DataType, SchemaRef};
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
-use datafusion_catalog::Session;
+use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
 use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
@@ -1074,6 +1073,7 @@ impl SessionStateBuilder {
             
.with_scalar_functions(SessionStateDefaults::default_scalar_functions())
             
.with_aggregate_functions(SessionStateDefaults::default_aggregate_functions())
             
.with_window_functions(SessionStateDefaults::default_window_functions())
+            
.with_table_function_list(SessionStateDefaults::default_table_functions())
     }
 
     /// Set the session id.
@@ -1188,6 +1188,19 @@ impl SessionStateBuilder {
         self
     }
 
+    /// Set the list of [`TableFunction`]s
+    pub fn with_table_function_list(
+        mut self,
+        table_functions: Vec<Arc<TableFunction>>,
+    ) -> Self {
+        let functions = table_functions
+            .into_iter()
+            .map(|f| (f.name().to_string(), f))
+            .collect();
+        self.table_functions = Some(functions);
+        self
+    }
+
     /// Set the map of [`ScalarUDF`]s
     pub fn with_scalar_functions(
         mut self,
diff --git a/datafusion/core/src/execution/session_state_defaults.rs 
b/datafusion/core/src/execution/session_state_defaults.rs
index 7ba332c520..106082bc7b 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory;
 use crate::execution::context::SessionState;
 #[cfg(feature = "nested_expressions")]
 use crate::functions_nested;
-use crate::{functions, functions_aggregate, functions_window};
+use crate::{functions, functions_aggregate, functions_table, functions_window};
+use datafusion_catalog::TableFunction;
 use datafusion_execution::config::SessionConfig;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::runtime_env::RuntimeEnv;
@@ -119,6 +120,11 @@ impl SessionStateDefaults {
         functions_window::all_default_window_functions()
     }
 
+    /// returns the list of default [`TableFunction`]s
+    pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
+        functions_table::all_default_table_functions()
+    }
+
     /// returns the list of default [`FileFormatFactory']'s
     pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
         let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index d049e774d7..a1b18b8bfe 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -773,6 +773,11 @@ pub mod functions_window {
     pub use datafusion_functions_window::*;
 }
 
+/// re-export of [`datafusion_functions_table`] crate
+pub mod functions_table {
+    pub use datafusion_functions_table::*;
+}
+
 /// re-export of variable provider for `@name` and `@@name` style runtime 
values.
 pub mod variable {
     pub use datafusion_expr::var_provider::{VarProvider, VarType};
diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_table_functions.rs
index 0cc156866d..39f10ef11a 100644
--- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs
@@ -21,7 +21,6 @@ use arrow::csv::ReaderBuilder;
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::datasource::function::TableFunctionImpl;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
 use datafusion::execution::TaskContext;
@@ -29,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::SessionContext;
 use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
 use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue};
 use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
 use std::fs::File;
diff --git a/datafusion/functions-table/Cargo.toml 
b/datafusion/functions-table/Cargo.toml
new file mode 100644
index 0000000000..f667bdde58
--- /dev/null
+++ b/datafusion/functions-table/Cargo.toml
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-functions-table"
+description = "Traits and types for logical plans and expressions for 
DataFusion query engine"
+keywords = ["datafusion", "logical", "plan", "expressions"]
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+rust-version = { workspace = true }
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_functions_table"
+path = "src/lib.rs"
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+ahash = { workspace = true }
+arrow = { workspace = true }
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-functions-aggregate-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+half = { workspace = true }
+indexmap = { workspace = true }
+log = { workspace = true }
+parking_lot = { workspace = true }
+paste = "1.0.14"
+
+[dev-dependencies]
+arrow = { workspace = true, features = ["test_utils"] }
+criterion = "0.5"
+rand = { workspace = true }
diff --git a/datafusion/functions-table/src/generate_series.rs 
b/datafusion/functions-table/src/generate_series.rs
new file mode 100644
index 0000000000..ced43ea8f0
--- /dev/null
+++ b/datafusion/functions-table/src/generate_series.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Int64Array;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
+use datafusion_catalog::TableProvider;
+use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::ExecutionPlan;
+use parking_lot::RwLock;
+use std::fmt;
+use std::sync::Arc;
+
+/// Table that generates a series of integers from `start`(inclusive) to 
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesTable {
+    schema: SchemaRef,
+    // None if input is Null
+    start: Option<i64>,
+    // None if input is Null
+    end: Option<i64>,
+}
+
+/// Table state that generates a series of integers from `start`(inclusive) to 
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesState {
+    schema: SchemaRef,
+    start: i64, // Kept for display
+    end: i64,
+    batch_size: usize,
+
+    /// Tracks current position when generating table
+    current: i64,
+}
+
+/// Detail to display for 'Explain' plan
+impl fmt::Display for GenerateSeriesState {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "generate_series: start={}, end={}, batch_size={}",
+            self.start, self.end, self.batch_size
+        )
+    }
+}
+
+impl LazyBatchGenerator for GenerateSeriesState {
+    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
+        // Check if we've reached the end
+        if self.current > self.end {
+            return Ok(None);
+        }
+
+        // Construct batch
+        let batch_end = (self.current + self.batch_size as i64 - 
1).min(self.end);
+        let array = Int64Array::from_iter_values(self.current..=batch_end);
+        let batch = RecordBatch::try_new(self.schema.clone(), 
vec![Arc::new(array)])?;
+
+        // Update current position for next batch
+        self.current = batch_end + 1;
+
+        Ok(Some(batch))
+    }
+}
+
+#[async_trait]
+impl TableProvider for GenerateSeriesTable {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        _projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let batch_size = state.config_options().execution.batch_size;
+        match (self.start, self.end) {
+            (Some(start), Some(end)) => {
+                if start > end {
+                    return plan_err!(
+                        "End value must be greater than or equal to start 
value"
+                    );
+                }
+
+                Ok(Arc::new(LazyMemoryExec::try_new(
+                    self.schema.clone(),
+                    vec![Arc::new(RwLock::new(GenerateSeriesState {
+                        schema: self.schema.clone(),
+                        start,
+                        end,
+                        current: start,
+                        batch_size,
+                    }))],
+                )?))
+            }
+            _ => {
+                // Either start or end is None, return a generator that 
outputs 0 rows
+                Ok(Arc::new(LazyMemoryExec::try_new(
+                    self.schema.clone(),
+                    vec![Arc::new(RwLock::new(GenerateSeriesState {
+                        schema: self.schema.clone(),
+                        start: 0,
+                        end: 0,
+                        current: 1,
+                        batch_size,
+                    }))],
+                )?))
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct GenerateSeriesFunc {}
+
+impl TableFunctionImpl for GenerateSeriesFunc {
+    // Check input `exprs` type and number. Input validity check (e.g. start 
<= end)
+    // will be performed in `TableProvider::scan`
+    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+        // TODO: support 1 or 3 arguments following DuckDB:
+        // <https://duckdb.org/docs/sql/functions/list#generate_series>
+        if exprs.len() == 3 || exprs.len() == 1 {
+            return not_impl_err!("generate_series does not support 1 or 3 
arguments");
+        }
+
+        if exprs.len() != 2 {
+            return plan_err!("generate_series expects 2 arguments");
+        }
+
+        let start = match &exprs[0] {
+            Expr::Literal(ScalarValue::Null) => None,
+            Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n),
+            _ => return plan_err!("First argument must be an integer literal"),
+        };
+
+        let end = match &exprs[1] {
+            Expr::Literal(ScalarValue::Null) => None,
+            Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n),
+            _ => return plan_err!("Second argument must be an integer 
literal"),
+        };
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int64,
+            false,
+        )]));
+
+        Ok(Arc::new(GenerateSeriesTable { schema, start, end }))
+    }
+}
diff --git a/datafusion/functions-table/src/lib.rs 
b/datafusion/functions-table/src/lib.rs
new file mode 100644
index 0000000000..9ea4c0c899
--- /dev/null
+++ b/datafusion/functions-table/src/lib.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod generate_series;
+
+use datafusion_catalog::TableFunction;
+use std::sync::Arc;
+
+/// Returns all default table functions
+pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
+    vec![generate_series()]
+}
+
+/// Creates a singleton instance of a table function
+/// - `$module`: A struct implementing `TableFunctionImpl` to create the 
function from
+/// - `$name`: The name to give to the created function
+///
+/// This is used to ensure creating the list of `TableFunction` only happens 
once.
+#[macro_export]
+macro_rules! create_udtf_function {
+    ($module:path, $name:expr) => {
+        paste::paste! {
+            static INSTANCE: std::sync::OnceLock<Arc<TableFunction>> = 
std::sync::OnceLock::new();
+
+            pub fn [<$name:lower>]() -> Arc<TableFunction> {
+                INSTANCE.get_or_init(|| {
+                    Arc::new(TableFunction::new(
+                        $name.to_string(),
+                        Arc::new($module {}),
+                    ))
+                }).clone()
+            }
+        }
+    };
+}
+
+create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series");
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index 272dcdc95b..bf6294f5a5 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading in-memory batches of data
 
+use parking_lot::RwLock;
 use std::any::Any;
 use std::fmt;
 use std::sync::Arc;
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+    /// Generate the next batch, return `None` when no more batches are 
available
+    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+}
+
+/// Execution plan for lazy in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all 
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant 
memory.
+pub struct LazyMemoryExec {
+    /// Schema representing the data
+    schema: SchemaRef,
+    /// Functions to generate batches for each partition
+    batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
+    /// Plan properties cache storing equivalence properties, partitioning, 
and execution mode
+    cache: PlanProperties,
+}
+
+impl LazyMemoryExec {
+    /// Create a new lazy memory execution plan
+    pub fn try_new(
+        schema: SchemaRef,
+        generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
+    ) -> Result<Self> {
+        let cache = PlanProperties::new(
+            EquivalenceProperties::new(Arc::clone(&schema)),
+            Partitioning::RoundRobinBatch(generators.len()),
+            ExecutionMode::Bounded,
+        );
+        Ok(Self {
+            schema,
+            batch_generators: generators,
+            cache,
+        })
+    }
+}
+
+impl fmt::Debug for LazyMemoryExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("LazyMemoryExec")
+            .field("schema", &self.schema)
+            .field("batch_generators", &self.batch_generators)
+            .finish()
+    }
+}
+
+impl DisplayAs for LazyMemoryExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "LazyMemoryExec: partitions={}, batch_generators=[{}]",
+                    self.batch_generators.len(),
+                    self.batch_generators
+                        .iter()
+                        .map(|g| g.read().to_string())
+                        .collect::<Vec<_>>()
+                        .join(", ")
+                )
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for LazyMemoryExec {
+    fn name(&self) -> &'static str {
+        "LazyMemoryExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in LazyMemoryExec")
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        if partition >= self.batch_generators.len() {
+            return internal_err!(
+                "Invalid partition {} for LazyMemoryExec with {} partitions",
+                partition,
+                self.batch_generators.len()
+            );
+        }
+
+        Ok(Box::pin(LazyMemoryStream {
+            schema: Arc::clone(&self.schema),
+            generator: Arc::clone(&self.batch_generators[partition]),
+        }))
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(Statistics::new_unknown(&self.schema))
+    }
+}
+
+/// Stream that generates record batches on demand
+pub struct LazyMemoryStream {
+    schema: SchemaRef,
+    /// Generator to produce batches
+    ///
+    /// Note: Idiomatically, DataFusion uses plan-time parallelism - each 
stream
+    /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or
+    /// construct multiple `LazyMemoryStream`s during planning to enable
+    /// parallel execution.
+    /// Sharing generators between streams should be used with caution.
+    generator: Arc<RwLock<dyn LazyBatchGenerator>>,
+}
+
+impl Stream for LazyMemoryStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let batch = self.generator.write().generate_next_batch();
+
+        match batch {
+            Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
+            Ok(None) => Poll::Ready(None),
+            Err(e) => Poll::Ready(Some(Err(e))),
+        }
+    }
+}
+
+impl RecordBatchStream for LazyMemoryStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
 #[cfg(test)]
-mod tests {
+mod memory_exec_tests {
     use std::sync::Arc;
 
     use crate::memory::MemoryExec;
@@ -416,3 +574,123 @@ mod tests {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod lazy_memory_tests {
+    use super::*;
+    use arrow::array::Int64Array;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use futures::StreamExt;
+
+    #[derive(Debug, Clone)]
+    struct TestGenerator {
+        counter: i64,
+        max_batches: i64,
+        batch_size: usize,
+        schema: SchemaRef,
+    }
+
+    impl fmt::Display for TestGenerator {
+        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+            write!(
+                f,
+                "TestGenerator: counter={}, max_batches={}, batch_size={}",
+                self.counter, self.max_batches, self.batch_size
+            )
+        }
+    }
+
+    impl LazyBatchGenerator for TestGenerator {
+        fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
+            if self.counter >= self.max_batches {
+                return Ok(None);
+            }
+
+            let array = Int64Array::from_iter_values(
+                (self.counter * self.batch_size as i64)
+                    ..(self.counter * self.batch_size as i64 + self.batch_size 
as i64),
+            );
+            self.counter += 1;
+            Ok(Some(RecordBatch::try_new(
+                Arc::clone(&self.schema),
+                vec![Arc::new(array)],
+            )?))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_lazy_memory_exec() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, false)]));
+        let generator = TestGenerator {
+            counter: 0,
+            max_batches: 3,
+            batch_size: 2,
+            schema: Arc::clone(&schema),
+        };
+
+        let exec =
+            LazyMemoryExec::try_new(schema, 
vec![Arc::new(RwLock::new(generator))])?;
+
+        // Test schema
+        assert_eq!(exec.schema().fields().len(), 1);
+        assert_eq!(exec.schema().field(0).name(), "a");
+
+        // Test execution
+        let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
+        let batches: Vec<_> = stream.collect::<Vec<_>>().await;
+
+        assert_eq!(batches.len(), 3);
+
+        // Verify batch contents
+        let batch0 = batches[0].as_ref().unwrap();
+        let array0 = batch0
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(array0.values(), &[0, 1]);
+
+        let batch1 = batches[1].as_ref().unwrap();
+        let array1 = batch1
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(array1.values(), &[2, 3]);
+
+        let batch2 = batches[2].as_ref().unwrap();
+        let array2 = batch2
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(array2.values(), &[4, 5]);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, false)]));
+        let generator = TestGenerator {
+            counter: 0,
+            max_batches: 1,
+            batch_size: 1,
+            schema: Arc::clone(&schema),
+        };
+
+        let exec =
+            LazyMemoryExec::try_new(schema, 
vec![Arc::new(RwLock::new(generator))])?;
+
+        // Test invalid partition
+        let result = exec.execute(1, Arc::new(TaskContext::default()));
+
+        // partition is 0-indexed, so there only should be partition 0
+        assert!(matches!(
+            result,
+            Err(e) if e.to_string().contains("Invalid partition 1 for 
LazyMemoryExec with 1 partitions")
+        ));
+
+        Ok(())
+    }
+}
diff --git a/datafusion/sqllogictest/test_files/table_functions.slt 
b/datafusion/sqllogictest/test_files/table_functions.slt
new file mode 100644
index 0000000000..12402e0d70
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/table_functions.slt
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test generate_series table function
+
+query I rowsort
+SELECT * FROM generate_series(1, 5)
+----
+1
+2
+3
+4
+5
+
+query I rowsort
+SELECT * FROM generate_series(1, 1)
+----
+1
+
+query I rowsort
+SELECT * FROM generate_series(3, 6)
+----
+3
+4
+5
+6
+
+query I rowsort
+SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
+----
+15
+
+# Test generate_series with WHERE clause
+query I rowsort
+SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0
+----
+10
+2
+4
+6
+8
+
+# Test generate_series with ORDER BY
+query I
+SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
+----
+5
+4
+3
+2
+1
+
+# Test generate_series with LIMIT
+query I rowsort
+SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
+----
+1
+2
+3
+4
+5
+
+# Test generate_series in subquery
+query I rowsort
+SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
+----
+11
+12
+13
+
+# Test generate_series with JOIN
+query II rowsort
+SELECT a.v1, b.v1 
+FROM generate_series(1, 3) a(v1)
+JOIN generate_series(2, 4) b(v1)
+ON a.v1 = b.v1 - 1
+----
+1 2
+2 3
+3 4
+
+query I
+SELECT * FROM generate_series(NULL, 5)
+----
+
+query I
+SELECT * FROM generate_series(1, NULL)
+----
+
+query I
+SELECT * FROM generate_series(NULL, NULL)
+----
+
+query TT
+EXPLAIN SELECT * FROM generate_series(1, 5)
+----
+logical_plan TableScan: tmp_table projection=[value]
+physical_plan LazyMemoryExec: partitions=1, batch_generators=[generate_series: 
start=1, end=5, batch_size=8192]
+
+#
+# Test generate_series with invalid arguments
+#
+
+query error DataFusion error: Error during planning: End value must be greater 
than or equal to start value
+SELECT * FROM generate_series(5, 1)
+
+statement error DataFusion error: This feature is not implemented: 
generate_series does not support 1 or 3 arguments
+SELECT * FROM generate_series(1, 5, NULL)
+
+statement error DataFusion error: This feature is not implemented: 
generate_series does not support 1 or 3 arguments
+SELECT * FROM generate_series(1)
+
+statement error DataFusion error: Error during planning: generate_series 
expects 2 arguments
+SELECT * FROM generate_series(1, 2, 3, 4)
+
+statement error DataFusion error: Error during planning: Second argument must 
be an integer literal
+SELECT * FROM generate_series(1, '2')
+
+statement error DataFusion error: Error during planning: First argument must 
be an integer literal
+SELECT * FROM generate_series('foo', 'bar')
+
+# UDF and UDTF `generate_series` can be used simultaneously
+query ? rowsort
+SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end)
+----
+[1, 2, 3, 4, 5]
+[1, 2, 3, 4]
+[1, 2, 3]
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to