This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git
The following commit(s) were added to refs/heads/main by this push:
new 151a0e2 Upgrade to DataFusion 43, fix a bug, add more tests (#53)
151a0e2 is described below
commit 151a0e280bcd3239210a65d62a69d596844fc90d
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 14 11:28:39 2024 -0700
Upgrade to DataFusion 43, fix a bug, add more tests (#53)
* Implementing Unit testing for Python
* Installing all deps in CI
* Adding maturin develop
* Restoring correct input partitioning
* Generated new plans
* Restored test plans for ignored tests
* tests
* fix
* fix
* update expected plans
* update expected plans
* revert some changes
* remove comment
* updated plans
* upgrade to DF 43
* update deps, more tests
* bug fix
---------
Co-authored-by: Edmondo Porcu <[email protected]>
---
.github/workflows/rust.yml | 13 ++-
Cargo.lock | 186 +++++++++++++++++++----------------
Cargo.toml | 4 +-
datafusion_ray/context.py | 5 +-
datafusion_ray/tests/test_context.py | 26 -----
examples/tips.py | 2 +-
pyproject.toml | 4 +-
requirements-in.txt | 4 +-
{datafusion_ray => scripts}/main.py | 0
src/planner.rs | 9 +-
src/query_stage.rs | 14 ++-
testdata/expected-plans/q1.txt | 8 +-
testdata/expected-plans/q10.txt | 4 +-
testdata/expected-plans/q11.txt | 20 ++--
testdata/expected-plans/q12.txt | 10 +-
testdata/expected-plans/q13.txt | 12 +--
testdata/expected-plans/q14.txt | 2 +-
testdata/expected-plans/q16.txt | 20 ++--
testdata/expected-plans/q17.txt | 4 +-
testdata/expected-plans/q18.txt | 8 +-
testdata/expected-plans/q19.txt | 22 ++---
testdata/expected-plans/q2.txt | 32 +++---
testdata/expected-plans/q20.txt | 22 ++---
testdata/expected-plans/q21.txt | 22 ++---
testdata/expected-plans/q22.txt | 20 ++--
testdata/expected-plans/q3.txt | 12 +--
testdata/expected-plans/q5.txt | 10 +-
testdata/expected-plans/q7.txt | 30 +++---
testdata/expected-plans/q8.txt | 16 +--
testdata/expected-plans/q9.txt | 12 +--
tests/test_context.py | 73 ++++++++++++++
31 files changed, 346 insertions(+), 280 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 3e03704..8620498 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -32,14 +32,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Install protobuf compiler
shell: bash
run: sudo apt-get install protobuf-compiler
- name: Build Rust code
run: cargo build --verbose
- name: Set up Python
- uses: actions/setup-python@v2
+ uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install test dependencies
@@ -49,5 +49,12 @@ jobs:
- name: Generate test data
run: |
./scripts/gen-test-data.sh
- - name: Run tests
+ - name: Run Rust tests
run: cargo test --verbose
+ - name: Run Python tests
+ run: |
+ python -m venv venv
+ source venv/bin/activate
+ pip install -r requirements-in.txt
+ maturin develop
+ python -m pytest
diff --git a/Cargo.lock b/Cargo.lock
index a976126..e7a25c6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -130,9 +130,9 @@ checksum =
"7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "arrow"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9ba0d7248932f4e2a12fb37f0a2e3ec82b3bdedbac2a1dce186e036843b8f8c"
+checksum = "c91839b07e474b3995035fd8ac33ee54f9c9ccbbb1ea33d9909c71bffdf1259d"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -152,9 +152,9 @@ dependencies = [
[[package]]
name = "arrow-arith"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d60afcdc004841a5c8d8da4f4fa22d64eb19c0c01ef4bcedd77f175a7cf6e38f"
+checksum = "855c57c4efd26722b044dcd3e348252560e3e0333087fb9f6479dc0bf744054f"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -167,9 +167,9 @@ dependencies = [
[[package]]
name = "arrow-array"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da"
+checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f"
dependencies = [
"ahash",
"arrow-buffer",
@@ -178,15 +178,15 @@ dependencies = [
"chrono",
"chrono-tz",
"half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
"num",
]
[[package]]
name = "arrow-buffer"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80"
+checksum = "9e4a9b9b1d6d7117f6138e13bc4dd5daa7f94e671b70e8c9c4dc37b4f5ecfc16"
dependencies = [
"bytes",
"half",
@@ -195,9 +195,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4"
+checksum = "bc70e39916e60c5b7af7a8e2719e3ae589326039e1e863675a008bee5ffe90fd"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -216,9 +216,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d3a4e4d63830a341713e35d9a42452fbc6241d5f42fa5cf6a4681b8ad91370c4"
+checksum = "789b2af43c1049b03a8d088ff6b2257cdcea1756cd76b174b1f2600356771b97"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -235,9 +235,9 @@ dependencies = [
[[package]]
name = "arrow-data"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634"
+checksum = "e4e75edf21ffd53744a9b8e3ed11101f610e7ceb1a29860432824f1834a1f623"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -247,9 +247,9 @@ dependencies = [
[[package]]
name = "arrow-ipc"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2"
+checksum = "d186a909dece9160bf8312f5124d797884f608ef5435a36d9d608e0b2a9bcbf8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -262,9 +262,9 @@ dependencies = [
[[package]]
name = "arrow-json"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b198b9c6fcf086501730efbbcb483317b39330a116125af7bb06467d04b352a3"
+checksum = "b66ff2fedc1222942d0bd2fd391cb14a85baa3857be95c9373179bd616753b85"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -282,9 +282,9 @@ dependencies = [
[[package]]
name = "arrow-ord"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2427f37b4459a4b9e533045abe87a5183a5e0995a3fc2c2fd45027ae2cc4ef3f"
+checksum = "ece7b5bc1180e6d82d1a60e1688c199829e8842e38497563c3ab6ea813e527fd"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -297,9 +297,9 @@ dependencies = [
[[package]]
name = "arrow-row"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "15959657d92e2261a7a323517640af87f5afd9fd8a6492e424ebee2203c567f6"
+checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff"
dependencies = [
"ahash",
"arrow-array",
@@ -311,18 +311,18 @@ dependencies = [
[[package]]
name = "arrow-schema"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794"
+checksum = "b95513080e728e4cec37f1ff5af4f12c9688d47795d17cda80b6ec2cf74d4678"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "arrow-select"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a"
+checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722"
dependencies = [
"ahash",
"arrow-array",
@@ -334,9 +334,9 @@ dependencies = [
[[package]]
name = "arrow-string"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ab3db7c09dd826e74079661d84ed01ed06547cf75d52c2818ef776d0d852305"
+checksum = "11d956cae7002eb8d83a27dbd34daaea1cf5b75852f0b84deb4d93a276e92bbf"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -459,9 +459,9 @@ dependencies = [
[[package]]
name = "brotli"
-version = "6.0.0"
+version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@@ -702,9 +702,9 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee907b081e45e1d14e1f327e89ef134f91fcebad0bfc2dc229fa9f6044379682"
+checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05"
dependencies = [
"ahash",
"apache-avro",
@@ -761,9 +761,9 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c2b914f6e33c429af7d8696c72a47ed9225d7e2b82c747ebdfa2408ed53579f"
+checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560"
dependencies = [
"arrow-schema",
"async-trait",
@@ -776,9 +776,9 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3a84f8e76330c582a6b8ada0b2c599ca46cfe46b7585e458fc3f4092bc722a18"
+checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c"
dependencies = [
"ahash",
"apache-avro",
@@ -789,6 +789,7 @@ dependencies = [
"chrono",
"half",
"hashbrown 0.14.5",
+ "indexmap",
"instant",
"libc",
"num_cpus",
@@ -802,9 +803,9 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf08cc30d92720d557df13bd5a5696213bd5ea0f38a866d8d85055d866fba774"
+checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685"
dependencies = [
"log",
"tokio",
@@ -812,9 +813,9 @@ dependencies = [
[[package]]
name = "datafusion-execution"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "86bc4183d5c45b9f068a6f351678a0d1eb1225181424542bb75db18ec280b822"
+checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361"
dependencies = [
"arrow",
"chrono",
@@ -833,9 +834,9 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "202119ce58e4d103e37ae64aab40d4e574c97bdd2bea994bf307b175fcbfa74d"
+checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246"
dependencies = [
"ahash",
"arrow",
@@ -845,7 +846,9 @@ dependencies = [
"datafusion-common",
"datafusion-expr-common",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr-common",
+ "indexmap",
"paste",
"serde_json",
"sqlparser",
@@ -855,20 +858,21 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f8b181ce8569216abb01ef3294aa16c0a40d7d39350c2ff01ede00f167a535f2"
+checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2"
dependencies = [
"arrow",
"datafusion-common",
+ "itertools 0.13.0",
"paste",
]
[[package]]
name = "datafusion-functions"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e4124b8066444e05a24472f852e94cf56546c0f4d92d00f018f207216902712"
+checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162"
dependencies = [
"arrow",
"arrow-buffer",
@@ -893,9 +897,9 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b94acdac235ea21810150a89751617ef2db7e32eba27f54be48a81bde2bfe119"
+checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741"
dependencies = [
"ahash",
"arrow",
@@ -907,16 +911,16 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"half",
+ "indexmap",
"log",
"paste",
- "sqlparser",
]
[[package]]
name = "datafusion-functions-aggregate-common"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c9ea085bbf900bf16e2ca0f56fc56236b2e4f2e1a2cccb67bcd83c5ab4ad0ef"
+checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143"
dependencies = [
"ahash",
"arrow",
@@ -928,9 +932,9 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c882e61665ed60c5ce9b061c1e587aeb8ae5ae4bcb5e5f2465139ab25328e0f"
+checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317"
dependencies = [
"arrow",
"arrow-array",
@@ -951,21 +955,34 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "98a354ce96df3ca6d025093adac9fd55ca09931c9b6f2630140721a95873fde4"
+checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe"
dependencies = [
"datafusion-common",
"datafusion-expr",
+ "datafusion-functions-window-common",
+ "datafusion-physical-expr",
"datafusion-physical-expr-common",
"log",
+ "paste",
+]
+
+[[package]]
+name = "datafusion-functions-window-common"
+version = "43.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b"
+dependencies = [
+ "datafusion-common",
+ "datafusion-physical-expr-common",
]
[[package]]
name = "datafusion-optimizer"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "baf677c74fb7b5a1899ef52709e4a70fff3ed80bdfb4bbe495909810e83d5f39"
+checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa"
dependencies = [
"arrow",
"async-trait",
@@ -983,9 +1000,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30b077999f6eb6c43d6b25bc66332a3be2f693c382840f008dd763b8540f9530"
+checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335"
dependencies = [
"ahash",
"arrow",
@@ -994,30 +1011,26 @@ dependencies = [
"arrow-ord",
"arrow-schema",
"arrow-string",
- "base64",
"chrono",
"datafusion-common",
- "datafusion-execution",
"datafusion-expr",
"datafusion-expr-common",
"datafusion-functions-aggregate-common",
"datafusion-physical-expr-common",
"half",
"hashbrown 0.14.5",
- "hex",
"indexmap",
"itertools 0.13.0",
"log",
"paste",
"petgraph",
- "regex",
]
[[package]]
name = "datafusion-physical-expr-common"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dce847f885c2b13bbe29f5c8b7948797131aa470af6e16d2a94f4428b4f4f1bd"
+checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b"
dependencies = [
"ahash",
"arrow",
@@ -1029,13 +1042,15 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d13238e3b9fdd62a4c18760bfef714bb990d1e1d3430e9f416aae4b3cfaa71af"
+checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe"
dependencies = [
+ "arrow",
"arrow-schema",
"datafusion-common",
"datafusion-execution",
+ "datafusion-expr-common",
"datafusion-physical-expr",
"datafusion-physical-plan",
"itertools 0.13.0",
@@ -1043,9 +1058,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "faba6f55a7eaf0241d07d12c2640de52742646b10f754485d5192bdfe2c9ceae"
+checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24"
dependencies = [
"ahash",
"arrow",
@@ -1059,8 +1074,8 @@ dependencies = [
"datafusion-common-runtime",
"datafusion-execution",
"datafusion-expr",
- "datafusion-functions-aggregate",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"futures",
@@ -1078,9 +1093,9 @@ dependencies = [
[[package]]
name = "datafusion-proto"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "585357d621fa03ea85a7fefca79ebc5ef0ee13a7f82be0762a414879a4d190a7"
+checksum = "f730f7fc5a20134d4e5ecdf7bbf392002ac58163d58423ea28a702dc077b06e1"
dependencies = [
"arrow",
"chrono",
@@ -1094,9 +1109,9 @@ dependencies = [
[[package]]
name = "datafusion-proto-common"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4db6534382f92f528bdb5d925b4214c31ffd84fa7fe1eff3ed0d2f1286851ab8"
+checksum = "12c225fe49e4f943e35446b263613ada7a9e9f8d647544e6b07037b9803567df"
dependencies = [
"arrow",
"chrono",
@@ -1107,15 +1122,16 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "42.0.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dad8d96a9b52e1aa24f9373696a815be828193efce7cb0bbd2140b6bb67d1819"
+checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51"
dependencies = [
"arrow",
"arrow-array",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
+ "indexmap",
"log",
"regex",
"sqlparser",
@@ -1368,9 +1384,9 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.15.0"
+version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
+checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "heck"
@@ -1451,7 +1467,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
- "hashbrown 0.15.0",
+ "hashbrown 0.15.2",
]
[[package]]
@@ -1862,9 +1878,9 @@ dependencies = [
[[package]]
name = "parquet"
-version = "53.1.0"
+version = "53.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e"
+checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191"
dependencies = [
"ahash",
"arrow-array",
@@ -1881,7 +1897,7 @@ dependencies = [
"flate2",
"futures",
"half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
"lz4_flex",
"num",
"num-bigint",
@@ -2437,9 +2453,9 @@ checksum =
"1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]]
name = "sqlparser"
-version = "0.50.0"
+version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
+checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
dependencies = [
"log",
"sqlparser_derive",
diff --git a/Cargo.toml b/Cargo.toml
index 91b0d11..cf145c4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,8 +29,8 @@ rust-version = "1.62"
build = "build.rs"
[dependencies]
-datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
-datafusion-proto = "42.0.0"
+datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
+datafusion-proto = "43.0"
futures = "0.3"
glob = "0.3.1"
log = "0.4"
diff --git a/datafusion_ray/context.py b/datafusion_ray/context.py
index f2ef86f..0070220 100644
--- a/datafusion_ray/context.py
+++ b/datafusion_ray/context.py
@@ -115,7 +115,7 @@ def execute_query_partition(
"ph": "X",
}
print(json.dumps(event), end=",")
- return ret[0] if len(ret) == 1 else ret
+ return ret
class DatafusionRayContext:
@@ -143,7 +143,7 @@ class DatafusionRayContext:
df = self.df_ctx.sql(sql)
return self.plan(df.execution_plan())
- def plan(self, execution_plan: Any) -> pa.RecordBatch:
+ def plan(self, execution_plan: Any) -> List[pa.RecordBatch]:
graph = self.ctx.plan(execution_plan)
final_stage_id = graph.get_final_query_stage().id()
@@ -161,4 +161,3 @@ class DatafusionRayContext:
# assert len(partitions) == 1, len(partitions)
result_set = ray.get(partitions[0])
return result_set
-
diff --git a/datafusion_ray/tests/test_context.py
b/datafusion_ray/tests/test_context.py
deleted file mode 100644
index 40b2578..0000000
--- a/datafusion_ray/tests/test_context.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from datafusion_ray import Context
-from datafusion import SessionContext
-
-
-def test():
- df_ctx = SessionContext()
- ctx = Context(df_ctx, False)
- df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
- ctx.plan("SELECT * FROM tips")
diff --git a/examples/tips.py b/examples/tips.py
index 3a2fa91..67ac64e 100644
--- a/examples/tips.py
+++ b/examples/tips.py
@@ -52,4 +52,4 @@ df = (
)
ray_results = ray_ctx.plan(df.execution_plan())
-df_ctx.create_dataframe([[ray_results]]).show()
+df_ctx.create_dataframe([ray_results]).show()
diff --git a/pyproject.toml b/pyproject.toml
index 10e097e..3a4eb7d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,8 +28,8 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
- "datafusion>=42.0.0",
- "pyarrow>=11.0.0",
+ "datafusion>=43.0.0",
+ "pyarrow>=18.0.0",
"typing-extensions;python_version<'3.13'",
]
diff --git a/requirements-in.txt b/requirements-in.txt
index 3fa00a6..b8216e9 100644
--- a/requirements-in.txt
+++ b/requirements-in.txt
@@ -4,9 +4,9 @@ isort
maturin
mypy
numpy
-pyarrow
+pyarrow>=18.0.0
pytest
ray==2.37.0
-datafusion>=42.0.0
+datafusion>=43.0.0
toml
importlib_metadata; python_version < "3.8"
diff --git a/datafusion_ray/main.py b/scripts/main.py
similarity index 100%
rename from datafusion_ray/main.py
rename to scripts/main.py
diff --git a/src/planner.rs b/src/planner.rs
index 7d9fdf0..954d8e2 100644
--- a/src/planner.rs
+++ b/src/planner.rs
@@ -276,7 +276,6 @@ mod test {
do_test(6).await
}
- #[ignore = "non-deterministic IN clause"]
#[tokio::test]
async fn test_q7() -> TestResult<()> {
do_test(7).await
@@ -302,7 +301,6 @@ mod test {
do_test(11).await
}
- #[ignore = "non-deterministic IN clause"]
#[tokio::test]
async fn test_q12() -> TestResult<()> {
do_test(12).await
@@ -324,10 +322,6 @@ mod test {
do_test(15).await
}
- // This test is ignored because there is some non-determinism
- // in a part of the plan, see
- //
https://github.com/edmondop/datafusion-ray/actions/runs/11180062292/job/31080996808"
- #[ignore = "non-deterministic IN clause"]
#[tokio::test]
async fn test_q16() -> TestResult<()> {
do_test(16).await
@@ -343,7 +337,6 @@ mod test {
do_test(18).await
}
- #[ignore = "non-deterministic IN clause"]
#[tokio::test]
async fn test_q19() -> TestResult<()> {
do_test(19).await
@@ -378,7 +371,7 @@ mod test {
];
for table in tables {
ctx.register_parquet(
- table,
+ *table,
&format!("{data_path}/{table}.parquet"),
ParquetReadOptions::default(),
)
diff --git a/src/query_stage.rs b/src/query_stage.rs
index 084cd72..05c090b 100644
--- a/src/query_stage.rs
+++ b/src/query_stage.rs
@@ -18,7 +18,7 @@
use crate::context::serialize_execution_plan;
use crate::shuffle::{ShuffleCodec, ShuffleReaderExec};
use datafusion::error::Result;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties,
Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;
use pyo3::prelude::*;
@@ -99,10 +99,14 @@ impl QueryStage {
/// Get the input partition count. This is the same as the number of
concurrent tasks
/// when we schedule this query stage for execution
pub fn get_input_partition_count(&self) -> usize {
- self.plan.children()[0]
- .properties()
- .output_partitioning()
- .partition_count()
+ if self.plan.children().is_empty() {
+ // leaf node (file scan)
+ self.plan.output_partitioning().partition_count()
+ } else {
+ self.plan.children()[0]
+ .output_partitioning()
+ .partition_count()
+ }
}
pub fn get_output_partition_count(&self) -> usize {
diff --git a/testdata/expected-plans/q1.txt b/testdata/expected-plans/q1.txt
index 8eaff99..282d5da 100644
--- a/testdata/expected-plans/q1.txt
+++ b/testdata/expected-plans/q1.txt
@@ -11,8 +11,8 @@ Sort: lineitem.l_returnflag ASC NULLS LAST,
lineitem.l_linestatus ASC NULLS LAST
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC
NULLS LAST]
- SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC
NULLS LAST]
+ SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS
LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as
l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty,
sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) +
lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty,
avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as
avg_d [...]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity),
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
CoalesceBatchesExec: target_batch_size=8192
@@ -36,13 +36,13 @@ ShuffleWriterExec(stage_id=0,
output_partitioning=Hash([Column { name: "l_return
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name:
"l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
- SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS
LAST], preserve_partitioning=[true]
+ SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS
LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as
l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty,
sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) +
lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty,
avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as
avg_d [...]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity),
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column {
name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }],
2))
Query Stage #2 (2 -> 1):
-SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC
NULLS LAST]
+SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC
NULLS LAST]
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name:
"l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
diff --git a/testdata/expected-plans/q10.txt b/testdata/expected-plans/q10.txt
index 916dcbb..046f69e 100644
--- a/testdata/expected-plans/q10.txt
+++ b/testdata/expected-plans/q10.txt
@@ -15,8 +15,8 @@ Sort: revenue DESC NULLS FIRST, fetch=20
Filter: orders.o_orderdate >= Date32("1993-07-01") AND
orders.o_orderdate < Date32("1993-10-01")
TableScan: orders projection=[o_orderkey, o_custkey,
o_orderdate], partial_filters=[orders.o_orderdate >= Date32("1993-07-01"),
orders.o_orderdate < Date32("1993-10-01")]
Projection: lineitem.l_orderkey, lineitem.l_extendedprice,
lineitem.l_discount
- Filter: lineitem.l_returnflag = Utf8("R")
- TableScan: lineitem projection=[l_orderkey, l_extendedprice,
l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8("R")]
+ Filter: lineitem.l_returnflag = Utf8View("R")
+ TableScan: lineitem projection=[l_orderkey, l_extendedprice,
l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag =
Utf8View("R")]
TableScan: nation projection=[n_nationkey, n_name]
DataFusion Physical Plan
diff --git a/testdata/expected-plans/q11.txt b/testdata/expected-plans/q11.txt
index 4478944..74f74d7 100644
--- a/testdata/expected-plans/q11.txt
+++ b/testdata/expected-plans/q11.txt
@@ -12,8 +12,8 @@ Sort: value DESC NULLS FIRST
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Projection: nation.n_nationkey
- Filter: nation.n_name = Utf8("ALGERIA")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("ALGERIA")]
+ Filter: nation.n_name = Utf8View("ALGERIA")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("ALGERIA")]
SubqueryAlias: __scalar_sq_1
Projection: CAST(CAST(sum(partsupp.ps_supplycost *
partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15))
Aggregate: groupBy=[[]], aggr=[[sum(partsupp.ps_supplycost *
CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
@@ -24,8 +24,8 @@ Sort: value DESC NULLS FIRST
TableScan: partsupp projection=[ps_suppkey, ps_availqty,
ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Projection: nation.n_nationkey
- Filter: nation.n_name = Utf8("ALGERIA")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("ALGERIA")]
+ Filter: nation.n_name = Utf8View("ALGERIA")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("ALGERIA")]
DataFusion Physical Plan
========================
@@ -42,9 +42,9 @@ SortPreservingMergeExec: [value@1 DESC]
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, s_nationkey@2)], projection=[ps_availqty@1, ps_supplycost@2]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = ALGERIA,
projection=[n_nationkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = ALGERIA,
projection=[n_nationkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA,
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false
ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END,
required_guarantees=[n_name in (ALGERIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([s_nationkey@2], 2),
input_partitions=2
@@ -66,9 +66,9 @@ SortPreservingMergeExec: [value@1 DESC]
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, s_nationkey@3)], projection=[ps_partkey@1, ps_availqty@2,
ps_supplycost@3]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = ALGERIA,
projection=[n_nationkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = ALGERIA,
projection=[n_nationkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA,
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false
ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END,
required_guarantees=[n_name in (ALGERIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([s_nationkey@3], 2),
input_partitions=2
diff --git a/testdata/expected-plans/q12.txt b/testdata/expected-plans/q12.txt
index f2052fb..c7ae269 100644
--- a/testdata/expected-plans/q12.txt
+++ b/testdata/expected-plans/q12.txt
@@ -3,13 +3,13 @@ DataFusion Logical Plan
Sort: lineitem.l_shipmode ASC NULLS LAST
Projection: lineitem.l_shipmode, sum(CASE WHEN orders.o_orderpriority =
Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE
Int64(0) END) AS high_line_count, sum(CASE WHEN orders.o_orderpriority !=
Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1)
ELSE Int64(0) END) AS low_line_count
- Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority =
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN
orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority !=
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
+ Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN
orders.o_orderpriority = Utf8View("1-URGENT") OR orders.o_orderpriority =
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority =
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN
orders.o_orderpriority != Utf8View("1-URGENT") AND orders.o_orderpriority !=
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN o [...]
Projection: orders.o_orderpriority, lineitem.l_shipmode
Inner Join: orders.o_orderkey = lineitem.l_orderkey
TableScan: orders projection=[o_orderkey, o_orderpriority]
Projection: lineitem.l_orderkey, lineitem.l_shipmode
- Filter: (lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode
= Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND
lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >=
Date32("1995-01-01") AND lineitem.l_receiptdate < Date32("1996-01-01")
- TableScan: lineitem projection=[l_orderkey, l_shipdate,
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode
= Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate >
lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate,
lineitem.l_receiptdate >= Date32("1995-01-01"), lineitem.l_receiptdate <
Date32("1996-01-01")]
+ Filter: (lineitem.l_shipmode = Utf8View("FOB") OR
lineitem.l_shipmode = Utf8View("SHIP")) AND lineitem.l_receiptdate >
lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND
lineitem.l_receiptdate >= Date32("1995-01-01") AND lineitem.l_receiptdate <
Date32("1996-01-01")
+ TableScan: lineitem projection=[l_orderkey, l_shipdate,
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode
= Utf8View("FOB") OR lineitem.l_shipmode = Utf8View("SHIP"),
lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate <
lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1995-01-01"),
lineitem.l_receiptdate < Date32("1996-01-01")]
DataFusion Physical Plan
========================
@@ -28,7 +28,7 @@ SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
RepartitionExec: partitioning=Hash([l_orderkey@0], 2),
input_partitions=2
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 =
SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2
AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01,
projection=[l_orderkey@0, l_shipmode@4]
- ParquetExec: file_groups={ ... },
projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode],
predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 >
l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >=
1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN
l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE
l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END O [...]
+ ParquetExec: file_groups={ ... },
projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode],
predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 >
l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >=
1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN
l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE
l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END O [...]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([o_orderkey@0], 2),
input_partitions=2
ParquetExec: file_groups={ ... },
projection=[o_orderkey, o_orderpriority]
@@ -40,7 +40,7 @@ Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name:
"l_orderkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND
l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND
l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01,
projection=[l_orderkey@0, l_shipmode@4]
- ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate,
l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR
l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10
< l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 <
1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 =
l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <=
l_shipmode_max@1 END OR CASE WHEN l_shipmode [...]
+ ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate,
l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR
l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10
< l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 <
1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 =
l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <=
l_shipmode_max@1 END OR CASE WHEN l_shipmode [...]
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name:
"o_orderkey", index: 0 }], 2))
diff --git a/testdata/expected-plans/q13.txt b/testdata/expected-plans/q13.txt
index 691f45e..366db12 100644
--- a/testdata/expected-plans/q13.txt
+++ b/testdata/expected-plans/q13.txt
@@ -11,14 +11,14 @@ Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC
NULLS FIRST
Left Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey]
Projection: orders.o_orderkey, orders.o_custkey
- Filter: orders.o_comment NOT LIKE Utf8("%express%requests%")
- TableScan: orders projection=[o_orderkey, o_custkey,
o_comment], partial_filters=[orders.o_comment NOT LIKE
Utf8("%express%requests%")]
+ Filter: orders.o_comment NOT LIKE
Utf8View("%express%requests%")
+ TableScan: orders projection=[o_orderkey, o_custkey,
o_comment], partial_filters=[orders.o_comment NOT LIKE
Utf8View("%express%requests%")]
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
- SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
+SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
+ SortExec: expr=[custdist@1 DESC, c_count@0 DESC],
preserve_partitioning=[true]
ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count],
aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
@@ -64,13 +64,13 @@ ShuffleWriterExec(stage_id=2,
output_partitioning=Hash([Column { name: "c_count"
Query Stage #3 (2 -> 2):
ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name:
"c_count", index: 0 }], 2))
- SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
+ SortExec: expr=[custdist@1 DESC, c_count@0 DESC],
preserve_partitioning=[true]
ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count],
aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column {
name: "c_count", index: 0 }], 2))
Query Stage #4 (2 -> 1):
-SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
+SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name:
"c_count", index: 0 }], 2))
diff --git a/testdata/expected-plans/q14.txt b/testdata/expected-plans/q14.txt
index 81ef8ef..67d16d6 100644
--- a/testdata/expected-plans/q14.txt
+++ b/testdata/expected-plans/q14.txt
@@ -2,7 +2,7 @@ DataFusion Logical Plan
=======================
Projection: Float64(100) * CAST(sum(CASE WHEN part.p_type LIKE Utf8("PROMO%")
THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0)
END) AS Float64) / CAST(sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount) AS Float64) AS promo_revenue
- Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE
Utf8("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),35,4) END) AS
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+ Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE
Utf8View("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),35,4) END) AS
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS __common_expr_1, part.p_type
Inner Join: lineitem.l_partkey = part.p_partkey
Projection: lineitem.l_partkey, lineitem.l_extendedprice,
lineitem.l_discount
diff --git a/testdata/expected-plans/q16.txt b/testdata/expected-plans/q16.txt
index 5ef333a..24ecb18 100644
--- a/testdata/expected-plans/q16.txt
+++ b/testdata/expected-plans/q16.txt
@@ -9,18 +9,18 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS
LAST, part.p_type AS
Projection: partsupp.ps_suppkey, part.p_brand, part.p_type,
part.p_size
Inner Join: partsupp.ps_partkey = part.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey]
- Filter: part.p_brand != Utf8("Brand#14") AND part.p_type NOT
LIKE Utf8("SMALL PLATED%") AND part.p_size IN ([Int32(14), Int32(6), Int32(5),
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])
- TableScan: part projection=[p_partkey, p_brand, p_type,
p_size], partial_filters=[part.p_brand != Utf8("Brand#14"), part.p_type NOT
LIKE Utf8("SMALL PLATED%"), part.p_size IN ([Int32(14), Int32(6), Int32(5),
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])]
+ Filter: part.p_brand != Utf8View("Brand#14") AND part.p_type NOT
LIKE Utf8View("SMALL PLATED%") AND part.p_size IN ([Int32(14), Int32(6),
Int32(5), Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])
+ TableScan: part projection=[p_partkey, p_brand, p_type,
p_size], partial_filters=[part.p_brand != Utf8View("Brand#14"), part.p_type NOT
LIKE Utf8View("SMALL PLATED%"), part.p_size IN ([Int32(14), Int32(6), Int32(5),
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])]
SubqueryAlias: __correlated_sq_1
Projection: supplier.s_suppkey
- Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
- TableScan: supplier projection=[s_suppkey, s_comment],
partial_filters=[supplier.s_comment LIKE Utf8("%Customer%Complaints%")]
+ Filter: supplier.s_comment LIKE Utf8View("%Customer%Complaints%")
+ TableScan: supplier projection=[s_suppkey, s_comment],
partial_filters=[supplier.s_comment LIKE Utf8View("%Customer%Complaints%")]
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
- SortExec: expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC
NULLS LAST,p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST,
p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST]
+ SortExec: expr=[supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, p_type@1 ASC
NULLS LAST, p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2
as p_size, count(alias1)@3 as supplier_cnt]
AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
CoalesceBatchesExec: target_batch_size=8192
@@ -48,7 +48,7 @@ SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC
NULLS LAST,p_type@1
CoalesceBatchesExec:
target_batch_size=8192
FilterExec: p_brand@1 != Brand#14
AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal {
value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) },
Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value:
Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }])
RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
- ParquetExec: file_groups={ ...
}]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3
THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND
(CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE
p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6
= p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5
END OR CASE WHEN p_size_null_count@6 [...]
+ ParquetExec: file_groups={ ...
}]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3
THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND
(CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE
p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6
= p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5
END OR CASE WHEN p_size_null_count@6 [...]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([ps_partkey@0], 2), input_partitions=2
ParquetExec: file_groups={ ... },
projection=[ps_partkey, ps_suppkey]
@@ -66,7 +66,7 @@ Query Stage #1 (1 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name:
"p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND
Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6)
}, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value:
Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) },
Literal { value: Int32(47) }])
- ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN
p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 !=
Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 =
p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5
END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE
p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 =
p_size_row_count@7 THEN false ELSE p_ [...]
+ ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN
p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 !=
Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 =
p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5
END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE
p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 =
p_size_row_count@7 THEN false ELSE p_ [...]
Query Stage #2 (2 -> 2):
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name:
"ps_partkey", index: 0 }], 2))
@@ -101,13 +101,13 @@ ShuffleWriterExec(stage_id=5,
output_partitioning=Hash([Column { name: "p_brand"
Query Stage #6 (2 -> 2):
ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name:
"p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name:
"p_size", index: 2 }], 2))
- SortExec: expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC
NULLS LAST,p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
+ SortExec: expr=[supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, p_type@1 ASC
NULLS LAST, p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2
as p_size, count(alias1)@3 as supplier_cnt]
AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column {
name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column {
name: "p_size", index: 2 }], 2))
Query Stage #7 (2 -> 1):
-SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
+SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST,
p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name:
"p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name:
"p_size", index: 2 }], 2))
diff --git a/testdata/expected-plans/q17.txt b/testdata/expected-plans/q17.txt
index 454f0ad..6006fd6 100644
--- a/testdata/expected-plans/q17.txt
+++ b/testdata/expected-plans/q17.txt
@@ -9,8 +9,8 @@ Projection: CAST(sum(lineitem.l_extendedprice) AS Float64) /
Float64(7) AS avg_y
Inner Join: lineitem.l_partkey = part.p_partkey
TableScan: lineitem projection=[l_partkey, l_quantity,
l_extendedprice]
Projection: part.p_partkey
- Filter: part.p_brand = Utf8("Brand#42") AND part.p_container =
Utf8("LG BAG")
- TableScan: part projection=[p_partkey, p_brand, p_container],
partial_filters=[part.p_brand = Utf8("Brand#42"), part.p_container = Utf8("LG
BAG")]
+ Filter: part.p_brand = Utf8View("Brand#42") AND part.p_container
= Utf8View("LG BAG")
+ TableScan: part projection=[p_partkey, p_brand, p_container],
partial_filters=[part.p_brand = Utf8View("Brand#42"), part.p_container =
Utf8View("LG BAG")]
SubqueryAlias: __scalar_sq_1
Projection: CAST(Float64(0.2) * CAST(avg(lineitem.l_quantity) AS
Float64) AS Decimal128(30, 15)), lineitem.l_partkey
Aggregate: groupBy=[[lineitem.l_partkey]],
aggr=[[avg(lineitem.l_quantity)]]
diff --git a/testdata/expected-plans/q18.txt b/testdata/expected-plans/q18.txt
index 0696af7..30179d0 100644
--- a/testdata/expected-plans/q18.txt
+++ b/testdata/expected-plans/q18.txt
@@ -20,8 +20,8 @@ Sort: orders.o_totalprice DESC NULLS FIRST,
orders.o_orderdate ASC NULLS LAST, f
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST],
fetch=100
- SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST],
fetch=100
+ SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC, o_orderdate@3 ASC
NULLS LAST], preserve_partitioning=[true]
AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name, c_custkey@1
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate,
o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([c_name@0, c_custkey@1,
o_orderkey@2, o_orderdate@3, o_totalprice@4], 2), input_partitions=2
@@ -99,12 +99,12 @@ ShuffleWriterExec(stage_id=5,
output_partitioning=Hash([Column { name: "c_name",
Query Stage #6 (2 -> 2):
ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name:
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name:
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column {
name: "o_totalprice", index: 4 }], 2))
- SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS
LAST], preserve_partitioning=[true]
+ SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC, o_orderdate@3 ASC
NULLS LAST], preserve_partitioning=[true]
AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name, c_custkey@1
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate,
o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name:
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name:
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column {
name: "o_totalprice", index: 4 }], 2))
Query Stage #7 (2 -> 1):
-SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST],
fetch=100
+SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST],
fetch=100
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name:
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name:
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column {
name: "o_totalprice", index: 4 }], 2))
diff --git a/testdata/expected-plans/q19.txt b/testdata/expected-plans/q19.txt
index c98f39e..c2e9025 100644
--- a/testdata/expected-plans/q19.txt
+++ b/testdata/expected-plans/q19.txt
@@ -4,12 +4,12 @@ DataFusion Logical Plan
Projection: sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS
revenue
Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice *
(Decimal128(Some(1),20,0) - lineitem.l_discount)) AS
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Projection: lineitem.l_extendedprice, lineitem.l_discount
- Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand =
Utf8("Brand#21") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"),
Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >=
Decimal128(Some(800),11,2) AND lineitem.l_quantity <=
Decimal128(Some(1800),11,2) AND part.p_size <= Int32(5) OR part.p_brand =
Utf8("Brand#13") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"),
Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128 [...]
+ Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand =
Utf8View("Brand#21") AND part.p_container IN ([Utf8View("SM CASE"),
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <=
Decimal128(Some(1800),11,2) AND part.p_size <= Int32(5) OR part.p_brand =
Utf8View("Brand#13") AND part.p_container IN ([Utf8View("MED BAG"),
Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK") [...]
Projection: lineitem.l_partkey, lineitem.l_quantity,
lineitem.l_extendedprice, lineitem.l_discount
- Filter: (lineitem.l_quantity >= Decimal128(Some(800),11,2) AND
lineitem.l_quantity <= Decimal128(Some(1800),11,2) OR lineitem.l_quantity >=
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <=
Decimal128(Some(3000),11,2) OR lineitem.l_quantity >=
Decimal128(Some(3000),11,2) AND lineitem.l_quantity <=
Decimal128(Some(4000),11,2)) AND (lineitem.l_shipmode = Utf8("AIR") OR
lineitem.l_shipmode = Utf8("AIR REG")) AND lineitem.l_shipinstruct =
Utf8("DELIVER IN PERSON")
- TableScan: lineitem projection=[l_partkey, l_quantity,
l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
partial_filters=[lineitem.l_shipmode = Utf8("AIR") OR lineitem.l_shipmode =
Utf8("AIR REG"), lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON"),
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <=
Decimal128(Some(1800),11,2) OR lineitem.l_quantity >=
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <=
Decimal128(Some(3000),11,2) OR line [...]
- Filter: (part.p_brand = Utf8("Brand#21") AND part.p_container IN
([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND
part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#13") AND part.p_container
IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND
part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#52") AND
part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"),
Utf8("LG PKG")]) AND part.p_size <= Int32(15)) AND [...]
- TableScan: part projection=[p_partkey, p_brand, p_size,
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand =
Utf8("Brand#21") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"),
Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand =
Utf8("Brand#13") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"),
Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR
part.p_brand = Utf8("Brand#52") AND part.p_container I [...]
+ Filter: (lineitem.l_quantity >= Decimal128(Some(800),11,2) AND
lineitem.l_quantity <= Decimal128(Some(1800),11,2) OR lineitem.l_quantity >=
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <=
Decimal128(Some(3000),11,2) OR lineitem.l_quantity >=
Decimal128(Some(3000),11,2) AND lineitem.l_quantity <=
Decimal128(Some(4000),11,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR
lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct =
Utf8View("DELIVER IN PERSON")
+ TableScan: lineitem projection=[l_partkey, l_quantity,
l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode =
Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"),
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <=
Decimal128(Some(1800),11,2) OR lineitem.l_quantity >=
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <= Decimal128(Some(3000),1
[...]
+ Filter: (part.p_brand = Utf8View("Brand#21") AND part.p_container IN
([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM
PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#13") AND
part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED
PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand =
Utf8View("Brand#52") AND part.p_container IN ([Utf8View("LG CASE"),
Utf8View("LG BOX"), Utf8View("LG PAC [...]
+ TableScan: part projection=[p_partkey, p_brand, p_size,
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand =
Utf8View("Brand#21") AND part.p_container IN ([Utf8View("SM CASE"),
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size
<= Int32(5) OR part.p_brand = Utf8View("Brand#13") AND part.p_container IN
([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED
PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = [...]
DataFusion Physical Plan
========================
@@ -19,18 +19,18 @@ ProjectionExec: expr=[sum(lineitem.l_extendedprice *
Int64(1) - lineitem.l_disco
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice
* Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
l_partkey@0)], filter=p_brand@1 = Brand#21 AND Use p_container@3 IN (SET)
([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") },
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2
<= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value:
Utf8("MED BAG") }, Literal { valu [...]
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
l_partkey@0)], filter=p_brand@1 = Brand#21 AND p_container@3 IN ([Literal {
value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal {
value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2
<= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value:
Utf8View("MED BAG") }, Literal { valu [...]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([p_partkey@0], 2),
input_partitions=2
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN
(SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") },
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND
p_size@2 <= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal
{ value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal {
value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <=
10 OR p_brand@1 = Brand#52 [...]
+ FilterExec: (p_brand@1 = Brand#21 AND p_container@3 IN
([Literal { value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX")
}, Literal { value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG")
}]) AND p_size@2 <= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal {
value: Utf8View("MED BAG") }, Literal { value: Utf8View("MED BOX") }, Literal {
value: Utf8View("MED PKG") }, Literal { value: Utf8View("MED PACK") }]) AND
p_size@2 <= 10 OR p_brand@1 [...]
RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... }]) AND p_size@5 <= 15),
pruning_predicate=CASE WHEN p_size_null_count@1 = p_size_row_count@2 THEN false
ELSE p_size_max@0 >= 1 END AND (CASE WHEN p_brand_null_count@5 =
p_brand_row_count@6 THEN false ELSE p_brand_min@3 <= Brand#21 AND Brand#21 <=
p_brand_max@4 END AND (CASE WHEN p_container_null_count@9 =
p_container_row_count@10 THEN false ELSE p_container_min@7 <= SM CASE AND SM
CASE <= p_container_max@8 END OR CASE WHEN p_container [...]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([l_partkey@0], 2),
input_partitions=2
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_quantity@1 >= Some(800),11,2 AND l_quantity@1
<= Some(1800),11,2 OR l_quantity@1 >= Some(2000),11,2 AND l_quantity@1 <=
Some(3000),11,2 OR l_quantity@1 >= Some(3000),11,2 AND l_quantity@1 <=
Some(4000),11,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND
l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1,
l_extendedprice@2, l_discount@3]
- ParquetExec: file_groups={ ... }, projection=[l_partkey,
l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN [...]
+ ParquetExec: file_groups={ ... }, projection=[l_partkey,
l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN [...]
DataFusion Ray Distributed Plan
===========
@@ -38,20 +38,20 @@ DataFusion Ray Distributed Plan
Query Stage #0 (1 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name:
"p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) ([Literal
{ value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value:
Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR
p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value:
Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value:
Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR
p_brand@1 = Brand#52 AND Use p_cont [...]
+ FilterExec: (p_brand@1 = Brand#21 AND p_container@3 IN ([Literal { value:
Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal { value:
Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND p_size@2 <=
5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value: Utf8View("MED
BAG") }, Literal { value: Utf8View("MED BOX") }, Literal { value: Utf8View("MED
PKG") }, Literal { value: Utf8View("MED PACK") }]) AND p_size@2 <= 10 OR
p_brand@1 = Brand#52 AN [...]
ParquetExec: file_groups={ ... }]) AND p_size@5 <= 15),
pruning_predicate=CASE WHEN p_size_null_count@1 = p_size_row_count@2 THEN false
ELSE p_size_max@0 >= 1 END AND (CASE WHEN p_brand_null_count@5 =
p_brand_row_count@6 THEN false ELSE p_brand_min@3 <= Brand#21 AND Brand#21 <=
p_brand_max@4 END AND (CASE WHEN p_container_null_count@9 =
p_container_row_count@10 THEN false ELSE p_container_min@7 <= SM CASE AND SM
CASE <= p_container_max@8 END OR CASE WHEN p_container_null_count@9 = [...]
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name:
"l_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_quantity@1 >= Some(800),11,2 AND l_quantity@1 <=
Some(1800),11,2 OR l_quantity@1 >= Some(2000),11,2 AND l_quantity@1 <=
Some(3000),11,2 OR l_quantity@1 >= Some(3000),11,2 AND l_quantity@1 <=
Some(4000),11,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND
l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1,
l_extendedprice@2, l_discount@3]
- ParquetExec: file_groups={ ... }, projection=[l_partkey, l_quantity,
l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN l_shipmode_nu
[...]
+ ParquetExec: file_groups={ ... }, projection=[l_partkey, l_quantity,
l_extendedprice, l_discount, l_shipinstruct, l_shipmode],
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN l_shipmode_nu
[...]
Query Stage #2 (2 -> 1):
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([], 2))
AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
l_partkey@0)], filter=p_brand@1 = Brand#21 AND Use p_container@3 IN (SET)
([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") },
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2
<= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value:
Utf8("MED BAG") }, Literal { value: U [...]
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
l_partkey@0)], filter=p_brand@1 = Brand#21 AND p_container@3 IN ([Literal {
value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal {
value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2
<= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value:
Utf8View("MED BAG") }, Literal { value: U [...]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column {
name: "p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
diff --git a/testdata/expected-plans/q2.txt b/testdata/expected-plans/q2.txt
index cb67479..bc0713c 100644
--- a/testdata/expected-plans/q2.txt
+++ b/testdata/expected-plans/q2.txt
@@ -13,14 +13,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name
ASC NULLS LAST, supplie
Projection: part.p_partkey, part.p_mfgr,
partsupp.ps_suppkey, partsupp.ps_supplycost
Inner Join: part.p_partkey = partsupp.ps_partkey
Projection: part.p_partkey, part.p_mfgr
- Filter: part.p_size = Int32(48) AND part.p_type LIKE
Utf8("%TIN")
- TableScan: part projection=[p_partkey, p_mfgr,
p_type, p_size], partial_filters=[part.p_size = Int32(48), part.p_type LIKE
Utf8("%TIN")]
+ Filter: part.p_size = Int32(48) AND part.p_type LIKE
Utf8View("%TIN")
+ TableScan: part projection=[p_partkey, p_mfgr,
p_type, p_size], partial_filters=[part.p_size = Int32(48), part.p_type LIKE
Utf8View("%TIN")]
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_name,
s_address, s_nationkey, s_phone, s_acctbal, s_comment]
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Projection: region.r_regionkey
- Filter: region.r_name = Utf8("ASIA")
- TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8("ASIA")]
+ Filter: region.r_name = Utf8View("ASIA")
+ TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8View("ASIA")]
SubqueryAlias: __scalar_sq_1
Projection: min(partsupp.ps_supplycost), partsupp.ps_partkey
Aggregate: groupBy=[[partsupp.ps_partkey]],
aggr=[[min(partsupp.ps_supplycost)]]
@@ -34,14 +34,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name
ASC NULLS LAST, supplie
TableScan: supplier projection=[s_suppkey, s_nationkey]
TableScan: nation projection=[n_nationkey, n_regionkey]
Projection: region.r_regionkey
- Filter: region.r_name = Utf8("ASIA")
- TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8("ASIA")]
+ Filter: region.r_name = Utf8View("ASIA")
+ TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8View("ASIA")]
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1
ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=100
- SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS
LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST],
preserve_partitioning=[true]
+SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1
ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
+ SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST,
s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST],
preserve_partitioning=[true]
ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name,
n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3
as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)],
projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4,
s_acctbal@5, s_comment@6, n_name@8]
@@ -51,9 +51,9 @@ SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS
LAST,s_name@1 ASC
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2,
s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8,
n_name@9]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([r_regionkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: r_name@1 = ASIA,
projection=[r_regionkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: r_name@1 = ASIA, projection=[r_regionkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA,
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false
ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END,
required_guarantees=[r_name in (ASIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_regionkey@9], 2),
input_partitions=2
@@ -96,9 +96,9 @@ SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS
LAST,s_name@1 ASC
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([r_regionkey@0], 2), input_partitions=2
- RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: r_name@1 = ASIA,
projection=[r_regionkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: r_name@1 = ASIA,
projection=[r_regionkey@0]
+ RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... },
projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA,
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false
ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END,
required_guarantees=[r_name in (ASIA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([n_regionkey@2], 2), input_partitions=2
@@ -243,7 +243,7 @@ ShuffleWriterExec(stage_id=16,
output_partitioning=Hash([Column { name: "ps_part
Query Stage #17 (2 -> 2):
ShuffleWriterExec(stage_id=17, output_partitioning=Hash([Column { name:
"p_partkey", index: 3 }], 2))
- SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS
LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST],
preserve_partitioning=[true]
+ SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST,
s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST],
preserve_partitioning=[true]
ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name,
n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3
as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0,
ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)],
projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4,
s_acctbal@5, s_comment@6, n_name@8]
@@ -253,6 +253,6 @@ ShuffleWriterExec(stage_id=17,
output_partitioning=Hash([Column { name: "p_partk
ShuffleReaderExec(stage_id=16, input_partitioning=Hash([Column {
name: "ps_partkey", index: 1 }, Column { name: "min(partsupp.ps_supplycost)",
index: 0 }], 2))
Query Stage #18 (2 -> 1):
-SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1
ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=100
+SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1
ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=17, input_partitioning=Hash([Column { name:
"p_partkey", index: 3 }], 2))
diff --git a/testdata/expected-plans/q20.txt b/testdata/expected-plans/q20.txt
index 5473093..13b21c8 100644
--- a/testdata/expected-plans/q20.txt
+++ b/testdata/expected-plans/q20.txt
@@ -3,22 +3,22 @@ DataFusion Logical Plan
Sort: supplier.s_name ASC NULLS LAST
Projection: supplier.s_name, supplier.s_address
- LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
+ LeftSemi Join: supplier.s_suppkey = __correlated_sq_2.ps_suppkey
Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address
Inner Join: supplier.s_nationkey = nation.n_nationkey
TableScan: supplier projection=[s_suppkey, s_name, s_address,
s_nationkey]
Projection: nation.n_nationkey
- Filter: nation.n_name = Utf8("KENYA")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("KENYA")]
- SubqueryAlias: __correlated_sq_1
+ Filter: nation.n_name = Utf8View("KENYA")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("KENYA")]
+ SubqueryAlias: __correlated_sq_2
Projection: partsupp.ps_suppkey
Inner Join: partsupp.ps_partkey = __scalar_sq_3.l_partkey,
partsupp.ps_suppkey = __scalar_sq_3.l_suppkey Filter: CAST(partsupp.ps_availqty
AS Float64) > __scalar_sq_3.Float64(0.5) * sum(lineitem.l_quantity)
- LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
+ LeftSemi Join: partsupp.ps_partkey = __correlated_sq_1.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_availqty]
- SubqueryAlias: __correlated_sq_2
+ SubqueryAlias: __correlated_sq_1
Projection: part.p_partkey
- Filter: part.p_name LIKE Utf8("blanched%")
- TableScan: part projection=[p_partkey, p_name],
partial_filters=[part.p_name LIKE Utf8("blanched%")]
+ Filter: part.p_name LIKE Utf8View("blanched%")
+ TableScan: part projection=[p_partkey, p_name],
partial_filters=[part.p_name LIKE Utf8View("blanched%")]
SubqueryAlias: __scalar_sq_3
Projection: Float64(0.5) * CAST(sum(lineitem.l_quantity) AS
Float64), lineitem.l_partkey, lineitem.l_suppkey
Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]],
aggr=[[sum(lineitem.l_quantity)]]
@@ -39,9 +39,9 @@ SortPreservingMergeExec: [s_name@0 ASC NULLS LAST]
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, s_nationkey@3)], projection=[s_suppkey@1, s_name@2,
s_address@3]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = KENYA,
projection=[n_nationkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = KENYA, projection=[n_nationkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = KENYA,
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false
ELSE n_name_min@0 <= KENYA AND KENYA <= n_name_max@1 END,
required_guarantees=[n_name in (KENYA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([s_nationkey@3], 2),
input_partitions=2
diff --git a/testdata/expected-plans/q21.txt b/testdata/expected-plans/q21.txt
index dbd5e97..b88bccc 100644
--- a/testdata/expected-plans/q21.txt
+++ b/testdata/expected-plans/q21.txt
@@ -19,11 +19,11 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS
LAST, fetch=100
Filter: lineitem.l_receiptdate >
lineitem.l_commitdate
TableScan: lineitem projection=[l_orderkey,
l_suppkey, l_commitdate, l_receiptdate],
partial_filters=[lineitem.l_receiptdate > lineitem.l_commitdate]
Projection: orders.o_orderkey
- Filter: orders.o_orderstatus = Utf8("F")
- TableScan: orders projection=[o_orderkey,
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8("F")]
+ Filter: orders.o_orderstatus = Utf8View("F")
+ TableScan: orders projection=[o_orderkey,
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8View("F")]
Projection: nation.n_nationkey
- Filter: nation.n_name = Utf8("ARGENTINA")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("ARGENTINA")]
+ Filter: nation.n_name = Utf8View("ARGENTINA")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("ARGENTINA")]
SubqueryAlias: __correlated_sq_1
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
@@ -36,8 +36,8 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS
LAST, fetch=100
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100
- SortExec: TopK(fetch=100), expr=[numwait@1 DESC,s_name@0 ASC NULLS LAST],
preserve_partitioning=[true]
+SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST], fetch=100
+ SortExec: TopK(fetch=100), expr=[numwait@1 DESC, s_name@0 ASC NULLS LAST],
preserve_partitioning=[true]
ProjectionExec: expr=[s_name@0 as s_name, count(*)@1 as numwait]
AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name],
aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
@@ -53,9 +53,9 @@ SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS
LAST], fetch=100
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, s_nationkey@1)], projection=[s_name@1, l_orderkey@3,
l_suppkey@4]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([n_nationkey@0], 2), input_partitions=2
- RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = ARGENTINA,
projection=[n_nationkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = ARGENTINA,
projection=[n_nationkey@0]
+ RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = ARGENTINA,
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false
ELSE n_name_min@0 <= ARGENTINA AND ARGENTINA <= n_name_max@1 END,
required_guarantees=[n_name in (ARGENTINA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([s_nationkey@1], 2), input_partitions=2
@@ -166,13 +166,13 @@ ShuffleWriterExec(stage_id=9,
output_partitioning=Hash([Column { name: "s_name",
Query Stage #10 (2 -> 2):
ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name:
"s_name", index: 0 }], 2))
- SortExec: TopK(fetch=100), expr=[numwait@1 DESC,s_name@0 ASC NULLS LAST],
preserve_partitioning=[true]
+ SortExec: TopK(fetch=100), expr=[numwait@1 DESC, s_name@0 ASC NULLS LAST],
preserve_partitioning=[true]
ProjectionExec: expr=[s_name@0 as s_name, count(*)@1 as numwait]
AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name],
aggr=[count(*)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column {
name: "s_name", index: 0 }], 2))
Query Stage #11 (2 -> 1):
-SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100
+SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name:
"s_name", index: 0 }], 2))
diff --git a/testdata/expected-plans/q22.txt b/testdata/expected-plans/q22.txt
index d46d5d5..da693fb 100644
--- a/testdata/expected-plans/q22.txt
+++ b/testdata/expected-plans/q22.txt
@@ -9,15 +9,15 @@ Sort: custsale.cntrycode ASC NULLS LAST
Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(15, 6)) >
__scalar_sq_2.avg(customer.c_acctbal)
Projection: customer.c_phone, customer.c_acctbal
LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey
- Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"),
Utf8("13")])
- TableScan: customer projection=[c_custkey, c_phone,
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"),
Utf8("13")])]
+ Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8View("24"), Utf8View("34"), Utf8View("16"), Utf8View("30"),
Utf8View("33"), Utf8View("14"), Utf8View("13")])
+ TableScan: customer projection=[c_custkey, c_phone,
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8View("24"), Utf8View("34"), Utf8View("16"), Utf8View("30"),
Utf8View("33"), Utf8View("14"), Utf8View("13")])]
SubqueryAlias: __correlated_sq_1
TableScan: orders projection=[o_custkey]
SubqueryAlias: __scalar_sq_2
Aggregate: groupBy=[[]], aggr=[[avg(customer.c_acctbal)]]
Projection: customer.c_acctbal
- Filter: customer.c_acctbal > Decimal128(Some(0),11,2) AND
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("24"), Utf8("34"),
Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"), Utf8("13")])
- TableScan: customer projection=[c_phone, c_acctbal],
partial_filters=[customer.c_acctbal > Decimal128(Some(0),11,2) AS
customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone,
Int64(1), Int64(2)) IN ([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"),
Utf8("33"), Utf8("14"), Utf8("13")]), customer.c_acctbal >
Decimal128(Some(0),11,2)]
+ Filter: customer.c_acctbal > Decimal128(Some(0),11,2) AND
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("24"),
Utf8View("34"), Utf8View("16"), Utf8View("30"), Utf8View("33"), Utf8View("14"),
Utf8View("13")])
+ TableScan: customer projection=[c_phone, c_acctbal],
partial_filters=[customer.c_acctbal > Decimal128(Some(0),11,2),
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("24"),
Utf8View("34"), Utf8View("16"), Utf8View("30"), Utf8View("33"), Utf8View("14"),
Utf8View("13")])]
DataFusion Physical Plan
========================
@@ -35,14 +35,14 @@ SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[],
aggr=[avg(customer.c_acctbal)]
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: c_acctbal@1 > Some(0),11,2 AND Use
substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("24") }, Literal {
value: Utf8("34") }, Literal { value: Utf8("16") }, Literal { value: Utf8("30")
}, Literal { value: Utf8("33") }, Literal { value: Utf8("14") }, Literal {
value: Utf8("13") }]), projection=[c_acctbal@1]
- ParquetExec: file_groups={ ... }]) AND c_acctbal@5
> Some(0),11,2, pruning_predicate=CASE WHEN c_acctbal_null_count@1 =
c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0 > Some(0),11,2 END AND
CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE
c_acctbal_max@0 > Some(0),11,2 END, required_guarantees=[]
+ FilterExec: c_acctbal@1 > Some(0),11,2 AND
substr(c_phone@0, 1, 2) IN ([Literal { value: Utf8View("24") }, Literal {
value: Utf8View("34") }, Literal { value: Utf8View("16") }, Literal { value:
Utf8View("30") }, Literal { value: Utf8View("33") }, Literal { value:
Utf8View("14") }, Literal { value: Utf8View("13") }]), projection=[c_acctbal@1]
+ ParquetExec: file_groups={ ... }]),
pruning_predicate=CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN
false ELSE c_acctbal_max@0 > Some(0),11,2 END, required_guarantees=[]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([c_custkey@0], 2),
input_partitions=2
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: Use substr(c_phone@1, 1, 2) IN (SET)
([Literal { value: Utf8("24") }, Literal { value: Utf8("34") }, Literal {
value: Utf8("16") }, Literal { value: Utf8("30") }, Literal { value: Utf8("33")
}, Literal { value: Utf8("14") }, Literal { value: Utf8("13") }])
+ FilterExec: substr(c_phone@1, 1, 2) IN ([Literal {
value: Utf8View("24") }, Literal { value: Utf8View("34") }, Literal { value:
Utf8View("16") }, Literal { value: Utf8View("30") }, Literal { value:
Utf8View("33") }, Literal { value: Utf8View("14") }, Literal { value:
Utf8View("13") }])
ParquetExec: file_groups={ ... }])
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([o_custkey@0], 2),
input_partitions=2
@@ -55,13 +55,13 @@ Query Stage #0 (2 -> 1):
ShuffleWriterExec(stage_id=0, output_partitioning=UnknownPartitioning(2))
AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)]
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: c_acctbal@1 > Some(0),11,2 AND Use substr(c_phone@0, 1, 2)
IN (SET) ([Literal { value: Utf8("24") }, Literal { value: Utf8("34") },
Literal { value: Utf8("16") }, Literal { value: Utf8("30") }, Literal { value:
Utf8("33") }, Literal { value: Utf8("14") }, Literal { value: Utf8("13") }]),
projection=[c_acctbal@1]
- ParquetExec: file_groups={ ... }]) AND c_acctbal@5 > Some(0),11,2,
pruning_predicate=CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN
false ELSE c_acctbal_max@0 > Some(0),11,2 END AND CASE WHEN
c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0
> Some(0),11,2 END, required_guarantees=[]
+ FilterExec: c_acctbal@1 > Some(0),11,2 AND substr(c_phone@0, 1, 2) IN
([Literal { value: Utf8View("24") }, Literal { value: Utf8View("34") }, Literal
{ value: Utf8View("16") }, Literal { value: Utf8View("30") }, Literal { value:
Utf8View("33") }, Literal { value: Utf8View("14") }, Literal { value:
Utf8View("13") }]), projection=[c_acctbal@1]
+ ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN
c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0
> Some(0),11,2 END, required_guarantees=[]
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name:
"c_custkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
- FilterExec: Use substr(c_phone@1, 1, 2) IN (SET) ([Literal { value:
Utf8("24") }, Literal { value: Utf8("34") }, Literal { value: Utf8("16") },
Literal { value: Utf8("30") }, Literal { value: Utf8("33") }, Literal { value:
Utf8("14") }, Literal { value: Utf8("13") }])
+ FilterExec: substr(c_phone@1, 1, 2) IN ([Literal { value: Utf8View("24")
}, Literal { value: Utf8View("34") }, Literal { value: Utf8View("16") },
Literal { value: Utf8View("30") }, Literal { value: Utf8View("33") }, Literal {
value: Utf8View("14") }, Literal { value: Utf8View("13") }])
ParquetExec: file_groups={ ... }])
Query Stage #2 (2 -> 2):
diff --git a/testdata/expected-plans/q3.txt b/testdata/expected-plans/q3.txt
index 6fd8791..f9039d3 100644
--- a/testdata/expected-plans/q3.txt
+++ b/testdata/expected-plans/q3.txt
@@ -9,8 +9,8 @@ Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS
LAST, fetch=10
Projection: orders.o_orderkey, orders.o_orderdate,
orders.o_shippriority
Inner Join: customer.c_custkey = orders.o_custkey
Projection: customer.c_custkey
- Filter: customer.c_mktsegment = Utf8("BUILDING")
- TableScan: customer projection=[c_custkey, c_mktsegment],
partial_filters=[customer.c_mktsegment = Utf8("BUILDING")]
+ Filter: customer.c_mktsegment = Utf8View("BUILDING")
+ TableScan: customer projection=[c_custkey, c_mktsegment],
partial_filters=[customer.c_mktsegment = Utf8View("BUILDING")]
Filter: orders.o_orderdate < Date32("1995-03-15")
TableScan: orders projection=[o_orderkey, o_custkey,
o_orderdate, o_shippriority], partial_filters=[orders.o_orderdate <
Date32("1995-03-15")]
Projection: lineitem.l_orderkey, lineitem.l_extendedprice,
lineitem.l_discount
@@ -20,8 +20,8 @@ Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS
LAST, fetch=10
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST],
fetch=10
- SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST],
fetch=10
+ SortExec: TopK(fetch=10), expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS
LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue,
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey,
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority],
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
@@ -91,13 +91,13 @@ ShuffleWriterExec(stage_id=4,
output_partitioning=Hash([Column { name: "l_orderk
Query Stage #5 (2 -> 2):
ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name:
"l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column {
name: "o_shippriority", index: 3 }], 2))
- SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS
LAST], preserve_partitioning=[true]
+ SortExec: TopK(fetch=10), expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS
LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue,
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey,
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority],
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column {
name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 },
Column { name: "o_shippriority", index: 2 }], 2))
Query Stage #6 (2 -> 1):
-SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST],
fetch=10
+SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST],
fetch=10
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name:
"l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column {
name: "o_shippriority", index: 3 }], 2))
diff --git a/testdata/expected-plans/q5.txt b/testdata/expected-plans/q5.txt
index 5351e06..2bacb27 100644
--- a/testdata/expected-plans/q5.txt
+++ b/testdata/expected-plans/q5.txt
@@ -22,8 +22,8 @@ Sort: revenue DESC NULLS FIRST
TableScan: supplier projection=[s_suppkey, s_nationkey]
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Projection: region.r_regionkey
- Filter: region.r_name = Utf8("AFRICA")
- TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8("AFRICA")]
+ Filter: region.r_name = Utf8View("AFRICA")
+ TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8View("AFRICA")]
DataFusion Physical Plan
========================
@@ -39,9 +39,9 @@ SortPreservingMergeExec: [revenue@1 DESC]
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(r_regionkey@0, n_regionkey@3)], projection=[l_extendedprice@1,
l_discount@2, n_name@3]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([r_regionkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: r_name@1 = AFRICA,
projection=[r_regionkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: r_name@1 = AFRICA,
projection=[r_regionkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[r_regionkey, r_name], predicate=r_name@1 = AFRICA,
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false
ELSE r_name_min@0 <= AFRICA AND AFRICA <= r_name_max@1 END,
required_guarantees=[r_name in (AFRICA)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_regionkey@3], 2),
input_partitions=2
diff --git a/testdata/expected-plans/q7.txt b/testdata/expected-plans/q7.txt
index b9e261a..43bc031 100644
--- a/testdata/expected-plans/q7.txt
+++ b/testdata/expected-plans/q7.txt
@@ -6,7 +6,7 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation
ASC NULLS LAST,
Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation,
shipping.l_year]], aggr=[[sum(shipping.volume)]]
SubqueryAlias: shipping
Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation,
date_part(Utf8("YEAR"), lineitem.l_shipdate) AS l_year,
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS
volume
- Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name
= Utf8("GERMANY") AND n2.n_name = Utf8("IRAQ") OR n1.n_name = Utf8("IRAQ") AND
n2.n_name = Utf8("GERMANY")
+ Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name
= Utf8View("GERMANY") AND n2.n_name = Utf8View("IRAQ") OR n1.n_name =
Utf8View("IRAQ") AND n2.n_name = Utf8View("GERMANY")
Projection: lineitem.l_extendedprice, lineitem.l_discount,
lineitem.l_shipdate, customer.c_nationkey, n1.n_name
Inner Join: supplier.s_nationkey = n1.n_nationkey
Projection: supplier.s_nationkey, lineitem.l_extendedprice,
lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey
@@ -21,17 +21,17 @@ Sort: shipping.supp_nation ASC NULLS LAST,
shipping.cust_nation ASC NULLS LAST,
TableScan: orders projection=[o_orderkey, o_custkey]
TableScan: customer projection=[c_custkey, c_nationkey]
SubqueryAlias: n1
- Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name =
Utf8("IRAQ")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("GERMANY") OR nation.n_name =
Utf8("IRAQ")]
+ Filter: nation.n_name = Utf8View("GERMANY") OR nation.n_name
= Utf8View("IRAQ")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("GERMANY") OR nation.n_name =
Utf8View("IRAQ")]
SubqueryAlias: n2
- Filter: nation.n_name = Utf8("IRAQ") OR nation.n_name =
Utf8("GERMANY")
- TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("IRAQ") OR nation.n_name =
Utf8("GERMANY")]
+ Filter: nation.n_name = Utf8View("IRAQ") OR nation.n_name =
Utf8View("GERMANY")
+ TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("IRAQ") OR nation.n_name =
Utf8View("GERMANY")]
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS
LAST,l_year@2 ASC NULLS LAST]
- SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS
LAST,l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC
NULLS LAST, l_year@2 ASC NULLS LAST]
+ SortExec: expr=[supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS LAST,
l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as
cust_nation, l_year@2 as l_year, sum(shipping.volume)@3 as revenue]
AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation,
cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[sum(shipping.volume)]
CoalesceBatchesExec: target_batch_size=8192
@@ -42,9 +42,9 @@ SortPreservingMergeExec: [supp_nation@0 ASC NULLS
LAST,cust_nation@1 ASC NULLS L
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, c_nationkey@3)], filter=n_name@0 = GERMANY AND n_name@1 =
IRAQ OR n_name@0 = IRAQ AND n_name@1 = GERMANY, projection=[n_name@1,
l_extendedprice@2, l_discount@3, l_shipdate@4, n_name@6]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_nationkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = IRAQ OR n_name@1 = GERMANY
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = IRAQ OR n_name@1 = GERMANY
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = IRAQ OR n_name@1 =
GERMANY, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3
THEN false ELSE n_name_min@0 <= IRAQ AND IRAQ <= n_name_max@1 END OR CASE WHEN
n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <=
GERMANY AND GERMANY <= n_name_max@1 END, required_guarantees=[n_name in
(GERMANY, IRAQ)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([c_nationkey@3], 2),
input_partitions=2
@@ -53,9 +53,9 @@ SortPreservingMergeExec: [supp_nation@0 ASC NULLS
LAST,cust_nation@1 ASC NULLS L
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(n_nationkey@0, s_nationkey@0)], projection=[n_name@1, l_extendedprice@3,
l_discount@4, l_shipdate@5, c_nationkey@6]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([n_nationkey@0], 2), input_partitions=2
- RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: n_name@1 = GERMANY OR
n_name@1 = IRAQ
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: n_name@1 = GERMANY OR n_name@1
= IRAQ
+ RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... },
projection=[n_nationkey, n_name], predicate=n_name@1 = GERMANY OR n_name@1 =
IRAQ, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN
false ELSE n_name_min@0 <= GERMANY AND GERMANY <= n_name_max@1 END OR CASE WHEN
n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= IRAQ
AND IRAQ <= n_name_max@1 END, required_guarantees=[n_name in (GERMANY, IRAQ)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec:
partitioning=Hash([s_nationkey@0], 2), input_partitions=2
@@ -170,13 +170,13 @@ ShuffleWriterExec(stage_id=10,
output_partitioning=Hash([Column { name: "supp_na
Query Stage #11 (2 -> 2):
ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name:
"supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column {
name: "l_year", index: 2 }], 2))
- SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS
LAST,l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
+ SortExec: expr=[supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS LAST,
l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as
cust_nation, l_year@2 as l_year, sum(shipping.volume)@3 as revenue]
AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation,
cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[sum(shipping.volume)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column {
name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 },
Column { name: "l_year", index: 2 }], 2))
Query Stage #12 (2 -> 1):
-SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS
LAST,l_year@2 ASC NULLS LAST]
+SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC
NULLS LAST, l_year@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name:
"supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column {
name: "l_year", index: 2 }], 2))
diff --git a/testdata/expected-plans/q8.txt b/testdata/expected-plans/q8.txt
index f2333a4..e9f5b91 100644
--- a/testdata/expected-plans/q8.txt
+++ b/testdata/expected-plans/q8.txt
@@ -3,7 +3,7 @@ DataFusion Logical Plan
Sort: all_nations.o_year ASC NULLS LAST
Projection: all_nations.o_year, sum(CASE WHEN all_nations.nation =
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END) /
sum(all_nations.volume) AS mkt_share
- Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN
all_nations.nation = Utf8("IRAQ") THEN all_nations.volume ELSE
Decimal128(Some(0),35,4) END) AS sum(CASE WHEN all_nations.nation =
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END),
sum(all_nations.volume)]]
+ Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN
all_nations.nation = Utf8View("IRAQ") THEN all_nations.volume ELSE
Decimal128(Some(0),35,4) END) AS sum(CASE WHEN all_nations.nation =
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END),
sum(all_nations.volume)]]
SubqueryAlias: all_nations
Projection: date_part(Utf8("YEAR"), orders.o_orderdate) AS o_year,
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS
volume, n2.n_name AS nation
Inner Join: n1.n_regionkey = region.r_regionkey
@@ -20,8 +20,8 @@ Sort: all_nations.o_year ASC NULLS LAST
Projection: lineitem.l_orderkey,
lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount
Inner Join: part.p_partkey =
lineitem.l_partkey
Projection: part.p_partkey
- Filter: part.p_type = Utf8("LARGE PLATED
STEEL")
- TableScan: part projection=[p_partkey,
p_type], partial_filters=[part.p_type = Utf8("LARGE PLATED STEEL")]
+ Filter: part.p_type = Utf8View("LARGE
PLATED STEEL")
+ TableScan: part projection=[p_partkey,
p_type], partial_filters=[part.p_type = Utf8View("LARGE PLATED STEEL")]
TableScan: lineitem
projection=[l_orderkey, l_partkey, l_suppkey, l_extendedprice, l_discount]
TableScan: supplier projection=[s_suppkey,
s_nationkey]
Filter: orders.o_orderdate >= Date32("1995-01-01")
AND orders.o_orderdate <= Date32("1996-12-31")
@@ -32,8 +32,8 @@ Sort: all_nations.o_year ASC NULLS LAST
SubqueryAlias: n2
TableScan: nation projection=[n_nationkey, n_name]
Projection: region.r_regionkey
- Filter: region.r_name = Utf8("MIDDLE EAST")
- TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8("MIDDLE EAST")]
+ Filter: region.r_name = Utf8View("MIDDLE EAST")
+ TableScan: region projection=[r_regionkey, r_name],
partial_filters=[region.r_name = Utf8View("MIDDLE EAST")]
DataFusion Physical Plan
========================
@@ -50,9 +50,9 @@ SortPreservingMergeExec: [o_year@0 ASC NULLS LAST]
HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(r_regionkey@0, n_regionkey@3)], projection=[l_extendedprice@1,
l_discount@2, o_orderdate@3, n_name@5]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([r_regionkey@0], 2),
input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CoalesceBatchesExec: target_batch_size=8192
- FilterExec: r_name@1 = MIDDLE EAST,
projection=[r_regionkey@0]
+ CoalesceBatchesExec: target_batch_size=8192
+ FilterExec: r_name@1 = MIDDLE EAST,
projection=[r_regionkey@0]
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
ParquetExec: file_groups={ ... },
projection=[r_regionkey, r_name], predicate=r_name@1 = MIDDLE EAST,
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false
ELSE r_name_min@0 <= MIDDLE EAST AND MIDDLE EAST <= r_name_max@1 END,
required_guarantees=[r_name in (MIDDLE EAST)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([n_regionkey@3], 2),
input_partitions=2
diff --git a/testdata/expected-plans/q9.txt b/testdata/expected-plans/q9.txt
index 8f738f4..2c713b3 100644
--- a/testdata/expected-plans/q9.txt
+++ b/testdata/expected-plans/q9.txt
@@ -16,8 +16,8 @@ Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS
FIRST
Projection: lineitem.l_orderkey, lineitem.l_partkey,
lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice,
lineitem.l_discount
Inner Join: part.p_partkey = lineitem.l_partkey
Projection: part.p_partkey
- Filter: part.p_name LIKE Utf8("%moccasin%")
- TableScan: part projection=[p_partkey,
p_name], partial_filters=[part.p_name LIKE Utf8("%moccasin%")]
+ Filter: part.p_name LIKE Utf8View("%moccasin%")
+ TableScan: part projection=[p_partkey,
p_name], partial_filters=[part.p_name LIKE Utf8View("%moccasin%")]
TableScan: lineitem projection=[l_orderkey,
l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount]
TableScan: supplier projection=[s_suppkey, s_nationkey]
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_supplycost]
@@ -27,8 +27,8 @@ Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS
FIRST
DataFusion Physical Plan
========================
-SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]
- SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC],
preserve_partitioning=[true]
+SortPreservingMergeExec: [nation@0 ASC NULLS LAST, o_year@1 DESC]
+ SortExec: expr=[nation@0 ASC NULLS LAST, o_year@1 DESC],
preserve_partitioning=[true]
ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year,
sum(profit.amount)@2 as sum_profit]
AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1
as o_year], aggr=[sum(profit.amount)]
CoalesceBatchesExec: target_batch_size=8192
@@ -160,13 +160,13 @@ ShuffleWriterExec(stage_id=10,
output_partitioning=Hash([Column { name: "nation"
Query Stage #11 (2 -> 2):
ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name:
"nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
- SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC],
preserve_partitioning=[true]
+ SortExec: expr=[nation@0 ASC NULLS LAST, o_year@1 DESC],
preserve_partitioning=[true]
ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year,
sum(profit.amount)@2 as sum_profit]
AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1
as o_year], aggr=[sum(profit.amount)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column {
name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
Query Stage #12 (2 -> 1):
-SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]
+SortPreservingMergeExec: [nation@0 ASC NULLS LAST, o_year@1 DESC]
ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name:
"nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
diff --git a/tests/test_context.py b/tests/test_context.py
new file mode 100644
index 0000000..ecc3324
--- /dev/null
+++ b/tests/test_context.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion_ray.context import DatafusionRayContext
+from datafusion import SessionContext, SessionConfig, RuntimeConfig, col, lit,
functions as F
+
+
+def test_basic_query_succeed():
+ df_ctx = SessionContext()
+ ctx = DatafusionRayContext(df_ctx)
+ df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+ # TODO why does this return a single batch and not a list of batches?
+ record_batches = ctx.sql("SELECT * FROM tips")
+ assert record_batches[0].num_rows == 244
+
+def test_aggregate_csv():
+ df_ctx = SessionContext()
+ ctx = DatafusionRayContext(df_ctx)
+ df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+ record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as
tip_pct from tips group by sex, smoker")
+ assert isinstance(record_batches, list)
+ # TODO why does this return many empty batches?
+ num_rows = 0
+ for record_batch in record_batches:
+ num_rows += record_batch.num_rows
+ assert num_rows == 4
+
+def test_aggregate_parquet():
+ df_ctx = SessionContext()
+ ctx = DatafusionRayContext(df_ctx)
+ df_ctx.register_parquet("tips", "examples/tips.parquet")
+ record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as
tip_pct from tips group by sex, smoker")
+ # TODO why does this return many empty batches?
+ num_rows = 0
+ for record_batch in record_batches:
+ num_rows += record_batch.num_rows
+ assert num_rows == 4
+
+def test_aggregate_parquet_dataframe():
+ df_ctx = SessionContext()
+ ray_ctx = DatafusionRayContext(df_ctx)
+ df = df_ctx.read_parquet(f"examples/tips.parquet")
+ df = (
+ df.aggregate(
+ [col("sex"), col("smoker"), col("day"), col("time")],
+ [F.avg(col("tip") / col("total_bill")).alias("tip_pct")],
+ )
+ .filter(col("day") != lit("Dinner"))
+ .aggregate([col("sex"), col("smoker")],
[F.avg(col("tip_pct")).alias("avg_pct")])
+ )
+ ray_results = ray_ctx.plan(df.execution_plan())
+ df_ctx.create_dataframe([ray_results]).show()
+
+
+def test_no_result_query():
+ df_ctx = SessionContext()
+ ctx = DatafusionRayContext(df_ctx)
+ df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+ ctx.sql("CREATE VIEW tips_view AS SELECT * FROM tips")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]