This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new 151a0e2  Upgrade to DataFusion 43, fix a bug, add more tests (#53)
151a0e2 is described below

commit 151a0e280bcd3239210a65d62a69d596844fc90d
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 14 11:28:39 2024 -0700

    Upgrade to DataFusion 43, fix a bug, add more tests (#53)
    
    * Implementing Unit testing for Python
    
    * Installing all deps in CI
    
    * Adding maturin develop
    
    * Restoring correct input partitioning
    
    * Generated new plans
    
    * Restored test plans for ignored tests
    
    * tests
    
    * fix
    
    * fix
    
    * update expected plans
    
    * update expected plans
    
    * revert some changes
    
    * remove comment
    
    * updated plans
    
    * upgrade to DF 43
    
    * update deps, more tests
    
    * bug fix
    
    ---------
    
    Co-authored-by: Edmondo Porcu <[email protected]>
---
 .github/workflows/rust.yml           |  13 ++-
 Cargo.lock                           | 186 +++++++++++++++++++----------------
 Cargo.toml                           |   4 +-
 datafusion_ray/context.py            |   5 +-
 datafusion_ray/tests/test_context.py |  26 -----
 examples/tips.py                     |   2 +-
 pyproject.toml                       |   4 +-
 requirements-in.txt                  |   4 +-
 {datafusion_ray => scripts}/main.py  |   0
 src/planner.rs                       |   9 +-
 src/query_stage.rs                   |  14 ++-
 testdata/expected-plans/q1.txt       |   8 +-
 testdata/expected-plans/q10.txt      |   4 +-
 testdata/expected-plans/q11.txt      |  20 ++--
 testdata/expected-plans/q12.txt      |  10 +-
 testdata/expected-plans/q13.txt      |  12 +--
 testdata/expected-plans/q14.txt      |   2 +-
 testdata/expected-plans/q16.txt      |  20 ++--
 testdata/expected-plans/q17.txt      |   4 +-
 testdata/expected-plans/q18.txt      |   8 +-
 testdata/expected-plans/q19.txt      |  22 ++---
 testdata/expected-plans/q2.txt       |  32 +++---
 testdata/expected-plans/q20.txt      |  22 ++---
 testdata/expected-plans/q21.txt      |  22 ++---
 testdata/expected-plans/q22.txt      |  20 ++--
 testdata/expected-plans/q3.txt       |  12 +--
 testdata/expected-plans/q5.txt       |  10 +-
 testdata/expected-plans/q7.txt       |  30 +++---
 testdata/expected-plans/q8.txt       |  16 +--
 testdata/expected-plans/q9.txt       |  12 +--
 tests/test_context.py                |  73 ++++++++++++++
 31 files changed, 346 insertions(+), 280 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 3e03704..8620498 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -32,14 +32,14 @@ jobs:
     runs-on: ubuntu-latest
 
     steps:
-      - uses: actions/checkout@v3
+      - uses: actions/checkout@v4
       - name: Install protobuf compiler
         shell: bash
         run: sudo apt-get install protobuf-compiler
       - name: Build Rust code
         run: cargo build --verbose
       - name: Set up Python
-        uses: actions/setup-python@v2
+        uses: actions/setup-python@v5
         with:
           python-version: ${{ env.PYTHON_VERSION }}
       - name: Install test dependencies
@@ -49,5 +49,12 @@ jobs:
       - name: Generate test data
         run: |
           ./scripts/gen-test-data.sh
-      - name: Run tests
+      - name: Run Rust tests
         run: cargo test --verbose
+      - name: Run Python tests
+        run: |
+          python -m venv venv
+          source venv/bin/activate
+          pip install -r requirements-in.txt
+          maturin develop
+          python -m pytest
diff --git a/Cargo.lock b/Cargo.lock
index a976126..e7a25c6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -130,9 +130,9 @@ checksum = 
"7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
 
 [[package]]
 name = "arrow"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a9ba0d7248932f4e2a12fb37f0a2e3ec82b3bdedbac2a1dce186e036843b8f8c"
+checksum = "c91839b07e474b3995035fd8ac33ee54f9c9ccbbb1ea33d9909c71bffdf1259d"
 dependencies = [
  "arrow-arith",
  "arrow-array",
@@ -152,9 +152,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-arith"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d60afcdc004841a5c8d8da4f4fa22d64eb19c0c01ef4bcedd77f175a7cf6e38f"
+checksum = "855c57c4efd26722b044dcd3e348252560e3e0333087fb9f6479dc0bf744054f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -167,9 +167,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-array"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da"
+checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f"
 dependencies = [
  "ahash",
  "arrow-buffer",
@@ -178,15 +178,15 @@ dependencies = [
  "chrono",
  "chrono-tz",
  "half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
  "num",
 ]
 
 [[package]]
 name = "arrow-buffer"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80"
+checksum = "9e4a9b9b1d6d7117f6138e13bc4dd5daa7f94e671b70e8c9c4dc37b4f5ecfc16"
 dependencies = [
  "bytes",
  "half",
@@ -195,9 +195,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-cast"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4"
+checksum = "bc70e39916e60c5b7af7a8e2719e3ae589326039e1e863675a008bee5ffe90fd"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -216,9 +216,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-csv"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d3a4e4d63830a341713e35d9a42452fbc6241d5f42fa5cf6a4681b8ad91370c4"
+checksum = "789b2af43c1049b03a8d088ff6b2257cdcea1756cd76b174b1f2600356771b97"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -235,9 +235,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-data"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634"
+checksum = "e4e75edf21ffd53744a9b8e3ed11101f610e7ceb1a29860432824f1834a1f623"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -247,9 +247,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ipc"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2"
+checksum = "d186a909dece9160bf8312f5124d797884f608ef5435a36d9d608e0b2a9bcbf8"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -262,9 +262,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-json"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b198b9c6fcf086501730efbbcb483317b39330a116125af7bb06467d04b352a3"
+checksum = "b66ff2fedc1222942d0bd2fd391cb14a85baa3857be95c9373179bd616753b85"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -282,9 +282,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ord"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2427f37b4459a4b9e533045abe87a5183a5e0995a3fc2c2fd45027ae2cc4ef3f"
+checksum = "ece7b5bc1180e6d82d1a60e1688c199829e8842e38497563c3ab6ea813e527fd"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -297,9 +297,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-row"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "15959657d92e2261a7a323517640af87f5afd9fd8a6492e424ebee2203c567f6"
+checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -311,18 +311,18 @@ dependencies = [
 
 [[package]]
 name = "arrow-schema"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794"
+checksum = "b95513080e728e4cec37f1ff5af4f12c9688d47795d17cda80b6ec2cf74d4678"
 dependencies = [
  "bitflags 2.6.0",
 ]
 
 [[package]]
 name = "arrow-select"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a"
+checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -334,9 +334,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-string"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7ab3db7c09dd826e74079661d84ed01ed06547cf75d52c2818ef776d0d852305"
+checksum = "11d956cae7002eb8d83a27dbd34daaea1cf5b75852f0b84deb4d93a276e92bbf"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -459,9 +459,9 @@ dependencies = [
 
 [[package]]
 name = "brotli"
-version = "6.0.0"
+version = "7.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
 dependencies = [
  "alloc-no-stdlib",
  "alloc-stdlib",
@@ -702,9 +702,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ee907b081e45e1d14e1f327e89ef134f91fcebad0bfc2dc229fa9f6044379682"
+checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05"
 dependencies = [
  "ahash",
  "apache-avro",
@@ -761,9 +761,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-catalog"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c2b914f6e33c429af7d8696c72a47ed9225d7e2b82c747ebdfa2408ed53579f"
+checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560"
 dependencies = [
  "arrow-schema",
  "async-trait",
@@ -776,9 +776,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3a84f8e76330c582a6b8ada0b2c599ca46cfe46b7585e458fc3f4092bc722a18"
+checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c"
 dependencies = [
  "ahash",
  "apache-avro",
@@ -789,6 +789,7 @@ dependencies = [
  "chrono",
  "half",
  "hashbrown 0.14.5",
+ "indexmap",
  "instant",
  "libc",
  "num_cpus",
@@ -802,9 +803,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common-runtime"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cf08cc30d92720d557df13bd5a5696213bd5ea0f38a866d8d85055d866fba774"
+checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685"
 dependencies = [
  "log",
  "tokio",
@@ -812,9 +813,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-execution"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "86bc4183d5c45b9f068a6f351678a0d1eb1225181424542bb75db18ec280b822"
+checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361"
 dependencies = [
  "arrow",
  "chrono",
@@ -833,9 +834,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "202119ce58e4d103e37ae64aab40d4e574c97bdd2bea994bf307b175fcbfa74d"
+checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246"
 dependencies = [
  "ahash",
  "arrow",
@@ -845,7 +846,9 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr-common",
+ "indexmap",
  "paste",
  "serde_json",
  "sqlparser",
@@ -855,20 +858,21 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr-common"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f8b181ce8569216abb01ef3294aa16c0a40d7d39350c2ff01ede00f167a535f2"
+checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2"
 dependencies = [
  "arrow",
  "datafusion-common",
+ "itertools 0.13.0",
  "paste",
 ]
 
 [[package]]
 name = "datafusion-functions"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6e4124b8066444e05a24472f852e94cf56546c0f4d92d00f018f207216902712"
+checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162"
 dependencies = [
  "arrow",
  "arrow-buffer",
@@ -893,9 +897,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-aggregate"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b94acdac235ea21810150a89751617ef2db7e32eba27f54be48a81bde2bfe119"
+checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741"
 dependencies = [
  "ahash",
  "arrow",
@@ -907,16 +911,16 @@ dependencies = [
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "half",
+ "indexmap",
  "log",
  "paste",
- "sqlparser",
 ]
 
 [[package]]
 name = "datafusion-functions-aggregate-common"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5c9ea085bbf900bf16e2ca0f56fc56236b2e4f2e1a2cccb67bcd83c5ab4ad0ef"
+checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143"
 dependencies = [
  "ahash",
  "arrow",
@@ -928,9 +932,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-nested"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c882e61665ed60c5ce9b061c1e587aeb8ae5ae4bcb5e5f2465139ab25328e0f"
+checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317"
 dependencies = [
  "arrow",
  "arrow-array",
@@ -951,21 +955,34 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-window"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "98a354ce96df3ca6d025093adac9fd55ca09931c9b6f2630140721a95873fde4"
+checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe"
 dependencies = [
  "datafusion-common",
  "datafusion-expr",
+ "datafusion-functions-window-common",
+ "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "log",
+ "paste",
+]
+
+[[package]]
+name = "datafusion-functions-window-common"
+version = "43.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b"
+dependencies = [
+ "datafusion-common",
+ "datafusion-physical-expr-common",
 ]
 
 [[package]]
 name = "datafusion-optimizer"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "baf677c74fb7b5a1899ef52709e4a70fff3ed80bdfb4bbe495909810e83d5f39"
+checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa"
 dependencies = [
  "arrow",
  "async-trait",
@@ -983,9 +1000,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "30b077999f6eb6c43d6b25bc66332a3be2f693c382840f008dd763b8540f9530"
+checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335"
 dependencies = [
  "ahash",
  "arrow",
@@ -994,30 +1011,26 @@ dependencies = [
  "arrow-ord",
  "arrow-schema",
  "arrow-string",
- "base64",
  "chrono",
  "datafusion-common",
- "datafusion-execution",
  "datafusion-expr",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
  "datafusion-physical-expr-common",
  "half",
  "hashbrown 0.14.5",
- "hex",
  "indexmap",
  "itertools 0.13.0",
  "log",
  "paste",
  "petgraph",
- "regex",
 ]
 
 [[package]]
 name = "datafusion-physical-expr-common"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "dce847f885c2b13bbe29f5c8b7948797131aa470af6e16d2a94f4428b4f4f1bd"
+checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b"
 dependencies = [
  "ahash",
  "arrow",
@@ -1029,13 +1042,15 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-optimizer"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d13238e3b9fdd62a4c18760bfef714bb990d1e1d3430e9f416aae4b3cfaa71af"
+checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe"
 dependencies = [
+ "arrow",
  "arrow-schema",
  "datafusion-common",
  "datafusion-execution",
+ "datafusion-expr-common",
  "datafusion-physical-expr",
  "datafusion-physical-plan",
  "itertools 0.13.0",
@@ -1043,9 +1058,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-plan"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "faba6f55a7eaf0241d07d12c2640de52742646b10f754485d5192bdfe2c9ceae"
+checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24"
 dependencies = [
  "ahash",
  "arrow",
@@ -1059,8 +1074,8 @@ dependencies = [
  "datafusion-common-runtime",
  "datafusion-execution",
  "datafusion-expr",
- "datafusion-functions-aggregate",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "futures",
@@ -1078,9 +1093,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-proto"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "585357d621fa03ea85a7fefca79ebc5ef0ee13a7f82be0762a414879a4d190a7"
+checksum = "f730f7fc5a20134d4e5ecdf7bbf392002ac58163d58423ea28a702dc077b06e1"
 dependencies = [
  "arrow",
  "chrono",
@@ -1094,9 +1109,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-proto-common"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4db6534382f92f528bdb5d925b4214c31ffd84fa7fe1eff3ed0d2f1286851ab8"
+checksum = "12c225fe49e4f943e35446b263613ada7a9e9f8d647544e6b07037b9803567df"
 dependencies = [
  "arrow",
  "chrono",
@@ -1107,15 +1122,16 @@ dependencies = [
 
 [[package]]
 name = "datafusion-sql"
-version = "42.0.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "dad8d96a9b52e1aa24f9373696a815be828193efce7cb0bbd2140b6bb67d1819"
+checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51"
 dependencies = [
  "arrow",
  "arrow-array",
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
+ "indexmap",
  "log",
  "regex",
  "sqlparser",
@@ -1368,9 +1384,9 @@ dependencies = [
 
 [[package]]
 name = "hashbrown"
-version = "0.15.0"
+version = "0.15.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
+checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
 
 [[package]]
 name = "heck"
@@ -1451,7 +1467,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
 dependencies = [
  "equivalent",
- "hashbrown 0.15.0",
+ "hashbrown 0.15.2",
 ]
 
 [[package]]
@@ -1862,9 +1878,9 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "53.1.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e"
+checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -1881,7 +1897,7 @@ dependencies = [
  "flate2",
  "futures",
  "half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
  "lz4_flex",
  "num",
  "num-bigint",
@@ -2437,9 +2453,9 @@ checksum = 
"1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
 [[package]]
 name = "sqlparser"
-version = "0.50.0"
+version = "0.51.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
+checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
 dependencies = [
  "log",
  "sqlparser_derive",
diff --git a/Cargo.toml b/Cargo.toml
index 91b0d11..cf145c4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,8 +29,8 @@ rust-version = "1.62"
 build = "build.rs"
 
 [dependencies]
-datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
-datafusion-proto = "42.0.0"
+datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
+datafusion-proto = "43.0"
 futures = "0.3"
 glob = "0.3.1"
 log = "0.4"
diff --git a/datafusion_ray/context.py b/datafusion_ray/context.py
index f2ef86f..0070220 100644
--- a/datafusion_ray/context.py
+++ b/datafusion_ray/context.py
@@ -115,7 +115,7 @@ def execute_query_partition(
         "ph": "X",
     }
     print(json.dumps(event), end=",")
-    return ret[0] if len(ret) == 1 else ret
+    return ret
 
 
 class DatafusionRayContext:
@@ -143,7 +143,7 @@ class DatafusionRayContext:
         df = self.df_ctx.sql(sql)
         return self.plan(df.execution_plan())
 
-    def plan(self, execution_plan: Any) -> pa.RecordBatch:
+    def plan(self, execution_plan: Any) -> List[pa.RecordBatch]:
 
         graph = self.ctx.plan(execution_plan)
         final_stage_id = graph.get_final_query_stage().id()
@@ -161,4 +161,3 @@ class DatafusionRayContext:
         # assert len(partitions) == 1, len(partitions)
         result_set = ray.get(partitions[0])
         return result_set
-
diff --git a/datafusion_ray/tests/test_context.py 
b/datafusion_ray/tests/test_context.py
deleted file mode 100644
index 40b2578..0000000
--- a/datafusion_ray/tests/test_context.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from datafusion_ray import Context
-from datafusion import SessionContext
-
-
-def test():
-    df_ctx = SessionContext()
-    ctx = Context(df_ctx, False)
-    df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
-    ctx.plan("SELECT * FROM tips")
diff --git a/examples/tips.py b/examples/tips.py
index 3a2fa91..67ac64e 100644
--- a/examples/tips.py
+++ b/examples/tips.py
@@ -52,4 +52,4 @@ df = (
 )
 
 ray_results = ray_ctx.plan(df.execution_plan())
-df_ctx.create_dataframe([[ray_results]]).show()
+df_ctx.create_dataframe([ray_results]).show()
diff --git a/pyproject.toml b/pyproject.toml
index 10e097e..3a4eb7d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,8 +28,8 @@ classifiers = [
     "Programming Language :: Python :: Implementation :: PyPy",
 ]
 dependencies = [
-    "datafusion>=42.0.0",
-    "pyarrow>=11.0.0",
+    "datafusion>=43.0.0",
+    "pyarrow>=18.0.0",
     "typing-extensions;python_version<'3.13'",
 ]
 
diff --git a/requirements-in.txt b/requirements-in.txt
index 3fa00a6..b8216e9 100644
--- a/requirements-in.txt
+++ b/requirements-in.txt
@@ -4,9 +4,9 @@ isort
 maturin
 mypy
 numpy
-pyarrow
+pyarrow>=18.0.0
 pytest
 ray==2.37.0
-datafusion>=42.0.0
+datafusion>=43.0.0
 toml
 importlib_metadata; python_version < "3.8"
diff --git a/datafusion_ray/main.py b/scripts/main.py
similarity index 100%
rename from datafusion_ray/main.py
rename to scripts/main.py
diff --git a/src/planner.rs b/src/planner.rs
index 7d9fdf0..954d8e2 100644
--- a/src/planner.rs
+++ b/src/planner.rs
@@ -276,7 +276,6 @@ mod test {
         do_test(6).await
     }
 
-    #[ignore = "non-deterministic IN clause"]
     #[tokio::test]
     async fn test_q7() -> TestResult<()> {
         do_test(7).await
@@ -302,7 +301,6 @@ mod test {
         do_test(11).await
     }
 
-    #[ignore = "non-deterministic IN clause"]
     #[tokio::test]
     async fn test_q12() -> TestResult<()> {
         do_test(12).await
@@ -324,10 +322,6 @@ mod test {
         do_test(15).await
     }
 
-    // This test is ignored because there is some non-determinism
-    // in a part of the plan, see
-    // 
https://github.com/edmondop/datafusion-ray/actions/runs/11180062292/job/31080996808";
-    #[ignore = "non-deterministic IN clause"]
     #[tokio::test]
     async fn test_q16() -> TestResult<()> {
         do_test(16).await
@@ -343,7 +337,6 @@ mod test {
         do_test(18).await
     }
 
-    #[ignore = "non-deterministic IN clause"]
     #[tokio::test]
     async fn test_q19() -> TestResult<()> {
         do_test(19).await
@@ -378,7 +371,7 @@ mod test {
         ];
         for table in tables {
             ctx.register_parquet(
-                table,
+                *table,
                 &format!("{data_path}/{table}.parquet"),
                 ParquetReadOptions::default(),
             )
diff --git a/src/query_stage.rs b/src/query_stage.rs
index 084cd72..05c090b 100644
--- a/src/query_stage.rs
+++ b/src/query_stage.rs
@@ -18,7 +18,7 @@
 use crate::context::serialize_execution_plan;
 use crate::shuffle::{ShuffleCodec, ShuffleReaderExec};
 use datafusion::error::Result;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, 
Partitioning};
 use datafusion::prelude::SessionContext;
 use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;
 use pyo3::prelude::*;
@@ -99,10 +99,14 @@ impl QueryStage {
     /// Get the input partition count. This is the same as the number of 
concurrent tasks
     /// when we schedule this query stage for execution
     pub fn get_input_partition_count(&self) -> usize {
-        self.plan.children()[0]
-            .properties()
-            .output_partitioning()
-            .partition_count()
+        if self.plan.children().is_empty() {
+            // leaf node (file scan)
+            self.plan.output_partitioning().partition_count()
+        } else {
+            self.plan.children()[0]
+                .output_partitioning()
+                .partition_count()
+        }
     }
 
     pub fn get_output_partition_count(&self) -> usize {
diff --git a/testdata/expected-plans/q1.txt b/testdata/expected-plans/q1.txt
index 8eaff99..282d5da 100644
--- a/testdata/expected-plans/q1.txt
+++ b/testdata/expected-plans/q1.txt
@@ -11,8 +11,8 @@ Sort: lineitem.l_returnflag ASC NULLS LAST, 
lineitem.l_linestatus ASC NULLS LAST
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC 
NULLS LAST]
-  SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS 
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC 
NULLS LAST]
+  SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS 
LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as 
l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, 
sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice 
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + 
lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, 
avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as 
avg_d [...]
       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), 
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), 
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -36,13 +36,13 @@ ShuffleWriterExec(stage_id=0, 
output_partitioning=Hash([Column { name: "l_return
 
 Query Stage #1 (2 -> 2):
 ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: 
"l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
-  SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS 
LAST], preserve_partitioning=[true]
+  SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS 
LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as 
l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, 
sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice 
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + 
lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, 
avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as 
avg_d [...]
       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), 
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), 
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { 
name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 
2))
 
 Query Stage #2 (2 -> 1):
-SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC 
NULLS LAST]
+SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC 
NULLS LAST]
   ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: 
"l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
 
diff --git a/testdata/expected-plans/q10.txt b/testdata/expected-plans/q10.txt
index 916dcbb..046f69e 100644
--- a/testdata/expected-plans/q10.txt
+++ b/testdata/expected-plans/q10.txt
@@ -15,8 +15,8 @@ Sort: revenue DESC NULLS FIRST, fetch=20
                     Filter: orders.o_orderdate >= Date32("1993-07-01") AND 
orders.o_orderdate < Date32("1993-10-01")
                       TableScan: orders projection=[o_orderkey, o_custkey, 
o_orderdate], partial_filters=[orders.o_orderdate >= Date32("1993-07-01"), 
orders.o_orderdate < Date32("1993-10-01")]
               Projection: lineitem.l_orderkey, lineitem.l_extendedprice, 
lineitem.l_discount
-                Filter: lineitem.l_returnflag = Utf8("R")
-                  TableScan: lineitem projection=[l_orderkey, l_extendedprice, 
l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8("R")]
+                Filter: lineitem.l_returnflag = Utf8View("R")
+                  TableScan: lineitem projection=[l_orderkey, l_extendedprice, 
l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = 
Utf8View("R")]
           TableScan: nation projection=[n_nationkey, n_name]
 
 DataFusion Physical Plan
diff --git a/testdata/expected-plans/q11.txt b/testdata/expected-plans/q11.txt
index 4478944..74f74d7 100644
--- a/testdata/expected-plans/q11.txt
+++ b/testdata/expected-plans/q11.txt
@@ -12,8 +12,8 @@ Sort: value DESC NULLS FIRST
                 TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty, ps_supplycost]
                 TableScan: supplier projection=[s_suppkey, s_nationkey]
             Projection: nation.n_nationkey
-              Filter: nation.n_name = Utf8("ALGERIA")
-                TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("ALGERIA")]
+              Filter: nation.n_name = Utf8View("ALGERIA")
+                TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("ALGERIA")]
       SubqueryAlias: __scalar_sq_1
         Projection: CAST(CAST(sum(partsupp.ps_supplycost * 
partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15))
           Aggregate: groupBy=[[]], aggr=[[sum(partsupp.ps_supplycost * 
CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
@@ -24,8 +24,8 @@ Sort: value DESC NULLS FIRST
                     TableScan: partsupp projection=[ps_suppkey, ps_availqty, 
ps_supplycost]
                     TableScan: supplier projection=[s_suppkey, s_nationkey]
                 Projection: nation.n_nationkey
-                  Filter: nation.n_name = Utf8("ALGERIA")
-                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("ALGERIA")]
+                  Filter: nation.n_name = Utf8View("ALGERIA")
+                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("ALGERIA")]
 
 DataFusion Physical Plan
 ========================
@@ -42,9 +42,9 @@ SortPreservingMergeExec: [value@1 DESC]
                   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, s_nationkey@2)], projection=[ps_availqty@1, ps_supplycost@2]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([n_nationkey@0], 2), 
input_partitions=2
-                        RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                          CoalesceBatchesExec: target_batch_size=8192
-                            FilterExec: n_name@1 = ALGERIA, 
projection=[n_nationkey@0]
+                        CoalesceBatchesExec: target_batch_size=8192
+                          FilterExec: n_name@1 = ALGERIA, 
projection=[n_nationkey@0]
+                            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                               ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, 
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false 
ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, 
required_guarantees=[n_name in (ALGERIA)]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([s_nationkey@2], 2), 
input_partitions=2
@@ -66,9 +66,9 @@ SortPreservingMergeExec: [value@1 DESC]
                   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, s_nationkey@3)], projection=[ps_partkey@1, ps_availqty@2, 
ps_supplycost@3]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([n_nationkey@0], 2), 
input_partitions=2
-                        RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                          CoalesceBatchesExec: target_batch_size=8192
-                            FilterExec: n_name@1 = ALGERIA, 
projection=[n_nationkey@0]
+                        CoalesceBatchesExec: target_batch_size=8192
+                          FilterExec: n_name@1 = ALGERIA, 
projection=[n_nationkey@0]
+                            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                               ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, 
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false 
ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, 
required_guarantees=[n_name in (ALGERIA)]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([s_nationkey@3], 2), 
input_partitions=2
diff --git a/testdata/expected-plans/q12.txt b/testdata/expected-plans/q12.txt
index f2052fb..c7ae269 100644
--- a/testdata/expected-plans/q12.txt
+++ b/testdata/expected-plans/q12.txt
@@ -3,13 +3,13 @@ DataFusion Logical Plan
 
 Sort: lineitem.l_shipmode ASC NULLS LAST
   Projection: lineitem.l_shipmode, sum(CASE WHEN orders.o_orderpriority = 
Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE 
Int64(0) END) AS high_line_count, sum(CASE WHEN orders.o_orderpriority != 
Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) 
ELSE Int64(0) END) AS low_line_count
-    Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN 
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN 
orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
+    Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN 
orders.o_orderpriority = Utf8View("1-URGENT") OR orders.o_orderpriority = 
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN 
orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN 
orders.o_orderpriority != Utf8View("1-URGENT") AND orders.o_orderpriority != 
Utf8View("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS sum(CASE WHEN o [...]
       Projection: orders.o_orderpriority, lineitem.l_shipmode
         Inner Join: orders.o_orderkey = lineitem.l_orderkey
           TableScan: orders projection=[o_orderkey, o_orderpriority]
           Projection: lineitem.l_orderkey, lineitem.l_shipmode
-            Filter: (lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode 
= Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND 
lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= 
Date32("1995-01-01") AND lineitem.l_receiptdate < Date32("1996-01-01")
-              TableScan: lineitem projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode 
= Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > 
lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, 
lineitem.l_receiptdate >= Date32("1995-01-01"), lineitem.l_receiptdate < 
Date32("1996-01-01")]
+            Filter: (lineitem.l_shipmode = Utf8View("FOB") OR 
lineitem.l_shipmode = Utf8View("SHIP")) AND lineitem.l_receiptdate > 
lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND 
lineitem.l_receiptdate >= Date32("1995-01-01") AND lineitem.l_receiptdate < 
Date32("1996-01-01")
+              TableScan: lineitem projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode 
= Utf8View("FOB") OR lineitem.l_shipmode = Utf8View("SHIP"), 
lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < 
lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1995-01-01"), 
lineitem.l_receiptdate < Date32("1996-01-01")]
 
 DataFusion Physical Plan
 ========================
@@ -28,7 +28,7 @@ SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
                       RepartitionExec: partitioning=Hash([l_orderkey@0], 2), 
input_partitions=2
                         CoalesceBatchesExec: target_batch_size=8192
                           FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = 
SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 
AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, 
projection=[l_orderkey@0, l_shipmode@4]
-                            ParquetExec: file_groups={ ... }, 
projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], 
predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > 
l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 
1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN 
l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE 
l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END O [...]
+                            ParquetExec: file_groups={ ... }, 
projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], 
predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > 
l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 
1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN 
l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE 
l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END O [...]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([o_orderkey@0], 2), 
input_partitions=2
                         ParquetExec: file_groups={ ... }, 
projection=[o_orderkey, o_orderpriority]
@@ -40,7 +40,7 @@ Query Stage #0 (2 -> 2):
 ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: 
"l_orderkey", index: 0 }], 2))
   CoalesceBatchesExec: target_batch_size=8192
     FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND 
l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND 
l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, 
projection=[l_orderkey@0, l_shipmode@4]
-      ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR 
l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 
< l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 
1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = 
l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= 
l_shipmode_max@1 END OR CASE WHEN l_shipmode [...]
+      ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, 
l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR 
l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 
< l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 
1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = 
l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= 
l_shipmode_max@1 END OR CASE WHEN l_shipmode [...]
 
 Query Stage #1 (2 -> 2):
 ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: 
"o_orderkey", index: 0 }], 2))
diff --git a/testdata/expected-plans/q13.txt b/testdata/expected-plans/q13.txt
index 691f45e..366db12 100644
--- a/testdata/expected-plans/q13.txt
+++ b/testdata/expected-plans/q13.txt
@@ -11,14 +11,14 @@ Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC 
NULLS FIRST
               Left Join: customer.c_custkey = orders.o_custkey
                 TableScan: customer projection=[c_custkey]
                 Projection: orders.o_orderkey, orders.o_custkey
-                  Filter: orders.o_comment NOT LIKE Utf8("%express%requests%")
-                    TableScan: orders projection=[o_orderkey, o_custkey, 
o_comment], partial_filters=[orders.o_comment NOT LIKE 
Utf8("%express%requests%")]
+                  Filter: orders.o_comment NOT LIKE 
Utf8View("%express%requests%")
+                    TableScan: orders projection=[o_orderkey, o_custkey, 
o_comment], partial_filters=[orders.o_comment NOT LIKE 
Utf8View("%express%requests%")]
 
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
-  SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
+SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
+  SortExec: expr=[custdist@1 DESC, c_count@0 DESC], 
preserve_partitioning=[true]
     ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
       AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], 
aggr=[count(*)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -64,13 +64,13 @@ ShuffleWriterExec(stage_id=2, 
output_partitioning=Hash([Column { name: "c_count"
 
 Query Stage #3 (2 -> 2):
 ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: 
"c_count", index: 0 }], 2))
-  SortExec: expr=[custdist@1 DESC,c_count@0 DESC], preserve_partitioning=[true]
+  SortExec: expr=[custdist@1 DESC, c_count@0 DESC], 
preserve_partitioning=[true]
     ProjectionExec: expr=[c_count@0 as c_count, count(*)@1 as custdist]
       AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], 
aggr=[count(*)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { 
name: "c_count", index: 0 }], 2))
 
 Query Stage #4 (2 -> 1):
-SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
+SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
   ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: 
"c_count", index: 0 }], 2))
 
diff --git a/testdata/expected-plans/q14.txt b/testdata/expected-plans/q14.txt
index 81ef8ef..67d16d6 100644
--- a/testdata/expected-plans/q14.txt
+++ b/testdata/expected-plans/q14.txt
@@ -2,7 +2,7 @@ DataFusion Logical Plan
 =======================
 
 Projection: Float64(100) * CAST(sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") 
THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) 
END) AS Float64) / CAST(sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount) AS Float64) AS promo_revenue
-  Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE 
Utf8("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),35,4) END) AS 
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+  Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN part.p_type LIKE 
Utf8View("PROMO%") THEN __common_expr_1 ELSE Decimal128(Some(0),35,4) END) AS 
sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
     Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS __common_expr_1, part.p_type
       Inner Join: lineitem.l_partkey = part.p_partkey
         Projection: lineitem.l_partkey, lineitem.l_extendedprice, 
lineitem.l_discount
diff --git a/testdata/expected-plans/q16.txt b/testdata/expected-plans/q16.txt
index 5ef333a..24ecb18 100644
--- a/testdata/expected-plans/q16.txt
+++ b/testdata/expected-plans/q16.txt
@@ -9,18 +9,18 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS 
LAST, part.p_type AS
           Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, 
part.p_size
             Inner Join: partsupp.ps_partkey = part.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey]
-              Filter: part.p_brand != Utf8("Brand#14") AND part.p_type NOT 
LIKE Utf8("SMALL PLATED%") AND part.p_size IN ([Int32(14), Int32(6), Int32(5), 
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])
-                TableScan: part projection=[p_partkey, p_brand, p_type, 
p_size], partial_filters=[part.p_brand != Utf8("Brand#14"), part.p_type NOT 
LIKE Utf8("SMALL PLATED%"), part.p_size IN ([Int32(14), Int32(6), Int32(5), 
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])]
+              Filter: part.p_brand != Utf8View("Brand#14") AND part.p_type NOT 
LIKE Utf8View("SMALL PLATED%") AND part.p_size IN ([Int32(14), Int32(6), 
Int32(5), Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])
+                TableScan: part projection=[p_partkey, p_brand, p_type, 
p_size], partial_filters=[part.p_brand != Utf8View("Brand#14"), part.p_type NOT 
LIKE Utf8View("SMALL PLATED%"), part.p_size IN ([Int32(14), Int32(6), Int32(5), 
Int32(31), Int32(49), Int32(15), Int32(41), Int32(47)])]
           SubqueryAlias: __correlated_sq_1
             Projection: supplier.s_suppkey
-              Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
-                TableScan: supplier projection=[s_suppkey, s_comment], 
partial_filters=[supplier.s_comment LIKE Utf8("%Customer%Complaints%")]
+              Filter: supplier.s_comment LIKE Utf8View("%Customer%Complaints%")
+                TableScan: supplier projection=[s_suppkey, s_comment], 
partial_filters=[supplier.s_comment LIKE Utf8View("%Customer%Complaints%")]
 
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS 
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
-  SortExec: expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC 
NULLS LAST,p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, 
p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST]
+  SortExec: expr=[supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, p_type@1 ASC 
NULLS LAST, p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 
as p_size, count(alias1)@3 as supplier_cnt]
       AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, 
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -48,7 +48,7 @@ SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC 
NULLS LAST,p_type@1
                                         CoalesceBatchesExec: 
target_batch_size=8192
                                           FilterExec: p_brand@1 != Brand#14 
AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal { 
value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) }, 
Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value: 
Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }])
                                             RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
-                                              ParquetExec: file_groups={ ... 
}]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 
THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND 
(CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE 
p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 
= p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 
END OR CASE WHEN p_size_null_count@6 [...]
+                                              ParquetExec: file_groups={ ... 
}]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 
THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND 
(CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE 
p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 
= p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 
END OR CASE WHEN p_size_null_count@6 [...]
                                     CoalesceBatchesExec: target_batch_size=8192
                                       RepartitionExec: 
partitioning=Hash([ps_partkey@0], 2), input_partitions=2
                                         ParquetExec: file_groups={ ... }, 
projection=[ps_partkey, ps_suppkey]
@@ -66,7 +66,7 @@ Query Stage #1 (1 -> 2):
 ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: 
"p_partkey", index: 0 }], 2))
   CoalesceBatchesExec: target_batch_size=8192
     FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND 
Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6) 
}, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value: 
Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) }, 
Literal { value: Int32(47) }])
-      ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN 
p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != 
Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = 
p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 
END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE 
p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = 
p_size_row_count@7 THEN false ELSE p_ [...]
+      ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN 
p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != 
Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = 
p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 
END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE 
p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = 
p_size_row_count@7 THEN false ELSE p_ [...]
 
 Query Stage #2 (2 -> 2):
 ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: 
"ps_partkey", index: 0 }], 2))
@@ -101,13 +101,13 @@ ShuffleWriterExec(stage_id=5, 
output_partitioning=Hash([Column { name: "p_brand"
 
 Query Stage #6 (2 -> 2):
 ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: 
"p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: 
"p_size", index: 2 }], 2))
-  SortExec: expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC 
NULLS LAST,p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
+  SortExec: expr=[supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, p_type@1 ASC 
NULLS LAST, p_size@2 ASC NULLS LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 
as p_size, count(alias1)@3 as supplier_cnt]
       AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, 
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { 
name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { 
name: "p_size", index: 2 }], 2))
 
 Query Stage #7 (2 -> 1):
-SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS 
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
+SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, 
p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST]
   ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: 
"p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: 
"p_size", index: 2 }], 2))
 
diff --git a/testdata/expected-plans/q17.txt b/testdata/expected-plans/q17.txt
index 454f0ad..6006fd6 100644
--- a/testdata/expected-plans/q17.txt
+++ b/testdata/expected-plans/q17.txt
@@ -9,8 +9,8 @@ Projection: CAST(sum(lineitem.l_extendedprice) AS Float64) / 
Float64(7) AS avg_y
           Inner Join: lineitem.l_partkey = part.p_partkey
             TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice]
             Projection: part.p_partkey
-              Filter: part.p_brand = Utf8("Brand#42") AND part.p_container = 
Utf8("LG BAG")
-                TableScan: part projection=[p_partkey, p_brand, p_container], 
partial_filters=[part.p_brand = Utf8("Brand#42"), part.p_container = Utf8("LG 
BAG")]
+              Filter: part.p_brand = Utf8View("Brand#42") AND part.p_container 
= Utf8View("LG BAG")
+                TableScan: part projection=[p_partkey, p_brand, p_container], 
partial_filters=[part.p_brand = Utf8View("Brand#42"), part.p_container = 
Utf8View("LG BAG")]
         SubqueryAlias: __scalar_sq_1
           Projection: CAST(Float64(0.2) * CAST(avg(lineitem.l_quantity) AS 
Float64) AS Decimal128(30, 15)), lineitem.l_partkey
             Aggregate: groupBy=[[lineitem.l_partkey]], 
aggr=[[avg(lineitem.l_quantity)]]
diff --git a/testdata/expected-plans/q18.txt b/testdata/expected-plans/q18.txt
index 0696af7..30179d0 100644
--- a/testdata/expected-plans/q18.txt
+++ b/testdata/expected-plans/q18.txt
@@ -20,8 +20,8 @@ Sort: orders.o_totalprice DESC NULLS FIRST, 
orders.o_orderdate ASC NULLS LAST, f
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], 
fetch=100
-  SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS 
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST], 
fetch=100
+  SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC, o_orderdate@3 ASC 
NULLS LAST], preserve_partitioning=[true]
     AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name, c_custkey@1 
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate, 
o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
       CoalesceBatchesExec: target_batch_size=8192
         RepartitionExec: partitioning=Hash([c_name@0, c_custkey@1, 
o_orderkey@2, o_orderdate@3, o_totalprice@4], 2), input_partitions=2
@@ -99,12 +99,12 @@ ShuffleWriterExec(stage_id=5, 
output_partitioning=Hash([Column { name: "c_name",
 
 Query Stage #6 (2 -> 2):
 ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: 
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: 
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { 
name: "o_totalprice", index: 4 }], 2))
-  SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS 
LAST], preserve_partitioning=[true]
+  SortExec: TopK(fetch=100), expr=[o_totalprice@4 DESC, o_orderdate@3 ASC 
NULLS LAST], preserve_partitioning=[true]
     AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name, c_custkey@1 
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate, 
o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
       CoalesceBatchesExec: target_batch_size=8192
         ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: 
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: 
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { 
name: "o_totalprice", index: 4 }], 2))
 
 Query Stage #7 (2 -> 1):
-SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], 
fetch=100
+SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST], 
fetch=100
   ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: 
"c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: 
"o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { 
name: "o_totalprice", index: 4 }], 2))
 
diff --git a/testdata/expected-plans/q19.txt b/testdata/expected-plans/q19.txt
index c98f39e..c2e9025 100644
--- a/testdata/expected-plans/q19.txt
+++ b/testdata/expected-plans/q19.txt
@@ -4,12 +4,12 @@ DataFusion Logical Plan
 Projection: sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS 
revenue
   Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice * 
(Decimal128(Some(1),20,0) - lineitem.l_discount)) AS 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
     Projection: lineitem.l_extendedprice, lineitem.l_discount
-      Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = 
Utf8("Brand#21") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), 
Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= 
Decimal128(Some(800),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(1800),11,2) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8("Brand#13") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), 
Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128 [...]
+      Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = 
Utf8View("Brand#21") AND part.p_container IN ([Utf8View("SM CASE"), 
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND 
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(1800),11,2) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8View("Brand#13") AND part.p_container IN ([Utf8View("MED BAG"), 
Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK") [...]
         Projection: lineitem.l_partkey, lineitem.l_quantity, 
lineitem.l_extendedprice, lineitem.l_discount
-          Filter: (lineitem.l_quantity >= Decimal128(Some(800),11,2) AND 
lineitem.l_quantity <= Decimal128(Some(1800),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(3000),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(4000),11,2)) AND (lineitem.l_shipmode = Utf8("AIR") OR 
lineitem.l_shipmode = Utf8("AIR REG")) AND lineitem.l_shipinstruct = 
Utf8("DELIVER IN PERSON")
-            TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
partial_filters=[lineitem.l_shipmode = Utf8("AIR") OR lineitem.l_shipmode = 
Utf8("AIR REG"), lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON"), 
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(1800),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),11,2) OR line [...]
-        Filter: (part.p_brand = Utf8("Brand#21") AND part.p_container IN 
([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND 
part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#13") AND part.p_container 
IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND 
part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#52") AND 
part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), 
Utf8("LG PKG")]) AND part.p_size <= Int32(15)) AND  [...]
-          TableScan: part projection=[p_partkey, p_brand, p_size, 
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = 
Utf8("Brand#21") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), 
Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = 
Utf8("Brand#13") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), 
Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR 
part.p_brand = Utf8("Brand#52") AND part.p_container I [...]
+          Filter: (lineitem.l_quantity >= Decimal128(Some(800),11,2) AND 
lineitem.l_quantity <= Decimal128(Some(1800),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(3000),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(3000),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(4000),11,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR 
lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = 
Utf8View("DELIVER IN PERSON")
+            TableScan: lineitem projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = 
Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), 
lineitem.l_quantity >= Decimal128(Some(800),11,2) AND lineitem.l_quantity <= 
Decimal128(Some(1800),11,2) OR lineitem.l_quantity >= 
Decimal128(Some(2000),11,2) AND lineitem.l_quantity <= Decimal128(Some(3000),1 
[...]
+        Filter: (part.p_brand = Utf8View("Brand#21") AND part.p_container IN 
([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM 
PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#13") AND 
part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED 
PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = 
Utf8View("Brand#52") AND part.p_container IN ([Utf8View("LG CASE"), 
Utf8View("LG BOX"), Utf8View("LG PAC [...]
+          TableScan: part projection=[p_partkey, p_brand, p_size, 
p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = 
Utf8View("Brand#21") AND part.p_container IN ([Utf8View("SM CASE"), 
Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size 
<= Int32(5) OR part.p_brand = Utf8View("Brand#13") AND part.p_container IN 
([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED 
PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = [...]
 
 DataFusion Physical Plan
 ========================
@@ -19,18 +19,18 @@ ProjectionExec: expr=[sum(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_disco
     CoalescePartitionsExec
       AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice 
* Int64(1) - lineitem.l_discount)]
         CoalesceBatchesExec: target_batch_size=8192
-          HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
l_partkey@0)], filter=p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) 
([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, 
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND 
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2 
<= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value: 
Utf8("MED BAG") }, Literal { valu [...]
+          HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
l_partkey@0)], filter=p_brand@1 = Brand#21 AND p_container@3 IN ([Literal { 
value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal { 
value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND 
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2 
<= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value: 
Utf8View("MED BAG") }, Literal { valu [...]
             CoalesceBatchesExec: target_batch_size=8192
               RepartitionExec: partitioning=Hash([p_partkey@0], 2), 
input_partitions=2
                 CoalesceBatchesExec: target_batch_size=8192
-                  FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN 
(SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, 
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND 
p_size@2 <= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal 
{ value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { 
value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 
10 OR p_brand@1 = Brand#52  [...]
+                  FilterExec: (p_brand@1 = Brand#21 AND p_container@3 IN 
([Literal { value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") 
}, Literal { value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") 
}]) AND p_size@2 <= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { 
value: Utf8View("MED BAG") }, Literal { value: Utf8View("MED BOX") }, Literal { 
value: Utf8View("MED PKG") }, Literal { value: Utf8View("MED PACK") }]) AND 
p_size@2 <= 10 OR p_brand@1 [...]
                     RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                       ParquetExec: file_groups={ ... }]) AND p_size@5 <= 15), 
pruning_predicate=CASE WHEN p_size_null_count@1 = p_size_row_count@2 THEN false 
ELSE p_size_max@0 >= 1 END AND (CASE WHEN p_brand_null_count@5 = 
p_brand_row_count@6 THEN false ELSE p_brand_min@3 <= Brand#21 AND Brand#21 <= 
p_brand_max@4 END AND (CASE WHEN p_container_null_count@9 = 
p_container_row_count@10 THEN false ELSE p_container_min@7 <= SM CASE AND SM 
CASE <= p_container_max@8 END OR CASE WHEN p_container [...]
             CoalesceBatchesExec: target_batch_size=8192
               RepartitionExec: partitioning=Hash([l_partkey@0], 2), 
input_partitions=2
                 CoalesceBatchesExec: target_batch_size=8192
                   FilterExec: (l_quantity@1 >= Some(800),11,2 AND l_quantity@1 
<= Some(1800),11,2 OR l_quantity@1 >= Some(2000),11,2 AND l_quantity@1 <= 
Some(3000),11,2 OR l_quantity@1 >= Some(3000),11,2 AND l_quantity@1 <= 
Some(4000),11,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND 
l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, 
l_extendedprice@2, l_discount@3]
-                    ParquetExec: file_groups={ ... }, projection=[l_partkey, 
l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND 
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND 
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND 
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND 
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN [...]
+                    ParquetExec: file_groups={ ... }, projection=[l_partkey, 
l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND 
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND 
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND 
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND 
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN [...]
 
 DataFusion Ray Distributed Plan
 ===========
@@ -38,20 +38,20 @@ DataFusion Ray Distributed Plan
 Query Stage #0 (1 -> 2):
 ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: 
"p_partkey", index: 0 }], 2))
   CoalesceBatchesExec: target_batch_size=8192
-    FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) ([Literal 
{ value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: 
Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR 
p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value: 
Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: 
Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR 
p_brand@1 = Brand#52 AND Use p_cont [...]
+    FilterExec: (p_brand@1 = Brand#21 AND p_container@3 IN ([Literal { value: 
Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal { value: 
Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND p_size@2 <= 
5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value: Utf8View("MED 
BAG") }, Literal { value: Utf8View("MED BOX") }, Literal { value: Utf8View("MED 
PKG") }, Literal { value: Utf8View("MED PACK") }]) AND p_size@2 <= 10 OR 
p_brand@1 = Brand#52 AN [...]
       ParquetExec: file_groups={ ... }]) AND p_size@5 <= 15), 
pruning_predicate=CASE WHEN p_size_null_count@1 = p_size_row_count@2 THEN false 
ELSE p_size_max@0 >= 1 END AND (CASE WHEN p_brand_null_count@5 = 
p_brand_row_count@6 THEN false ELSE p_brand_min@3 <= Brand#21 AND Brand#21 <= 
p_brand_max@4 END AND (CASE WHEN p_container_null_count@9 = 
p_container_row_count@10 THEN false ELSE p_container_min@7 <= SM CASE AND SM 
CASE <= p_container_max@8 END OR CASE WHEN p_container_null_count@9 =  [...]
 
 Query Stage #1 (2 -> 2):
 ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: 
"l_partkey", index: 0 }], 2))
   CoalesceBatchesExec: target_batch_size=8192
     FilterExec: (l_quantity@1 >= Some(800),11,2 AND l_quantity@1 <= 
Some(1800),11,2 OR l_quantity@1 >= Some(2000),11,2 AND l_quantity@1 <= 
Some(3000),11,2 OR l_quantity@1 >= Some(3000),11,2 AND l_quantity@1 <= 
Some(4000),11,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND 
l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, 
l_extendedprice@2, l_discount@3]
-      ParquetExec: file_groups={ ... }, projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND 
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND 
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND 
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND 
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN l_shipmode_nu 
[...]
+      ParquetExec: file_groups={ ... }, projection=[l_partkey, l_quantity, 
l_extendedprice, l_discount, l_shipinstruct, l_shipmode], 
predicate=(l_shipmode@14 = AIR OR l_shipmode@14 = AIR REG) AND 
l_shipinstruct@13 = DELIVER IN PERSON AND (l_quantity@4 >= Some(800),11,2 AND 
l_quantity@4 <= Some(1800),11,2 OR l_quantity@4 >= Some(2000),11,2 AND 
l_quantity@4 <= Some(3000),11,2 OR l_quantity@4 >= Some(3000),11,2 AND 
l_quantity@4 <= Some(4000),11,2), pruning_predicate=(CASE WHEN l_shipmode_nu 
[...]
 
 Query Stage #2 (2 -> 1):
 ShuffleWriterExec(stage_id=2, output_partitioning=Hash([], 2))
   AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount)]
     CoalesceBatchesExec: target_batch_size=8192
-      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
l_partkey@0)], filter=p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) 
([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, 
Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND 
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2 
<= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value: 
Utf8("MED BAG") }, Literal { value: U [...]
+      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
l_partkey@0)], filter=p_brand@1 = Brand#21 AND p_container@3 IN ([Literal { 
value: Utf8View("SM CASE") }, Literal { value: Utf8View("SM BOX") }, Literal { 
value: Utf8View("SM PACK") }, Literal { value: Utf8View("SM PKG") }]) AND 
l_quantity@0 >= Some(800),11,2 AND l_quantity@0 <= Some(1800),11,2 AND p_size@2 
<= 5 OR p_brand@1 = Brand#13 AND p_container@3 IN ([Literal { value: 
Utf8View("MED BAG") }, Literal { value: U [...]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { 
name: "p_partkey", index: 0 }], 2))
         CoalesceBatchesExec: target_batch_size=8192
diff --git a/testdata/expected-plans/q2.txt b/testdata/expected-plans/q2.txt
index cb67479..bc0713c 100644
--- a/testdata/expected-plans/q2.txt
+++ b/testdata/expected-plans/q2.txt
@@ -13,14 +13,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name 
ASC NULLS LAST, supplie
                   Projection: part.p_partkey, part.p_mfgr, 
partsupp.ps_suppkey, partsupp.ps_supplycost
                     Inner Join: part.p_partkey = partsupp.ps_partkey
                       Projection: part.p_partkey, part.p_mfgr
-                        Filter: part.p_size = Int32(48) AND part.p_type LIKE 
Utf8("%TIN")
-                          TableScan: part projection=[p_partkey, p_mfgr, 
p_type, p_size], partial_filters=[part.p_size = Int32(48), part.p_type LIKE 
Utf8("%TIN")]
+                        Filter: part.p_size = Int32(48) AND part.p_type LIKE 
Utf8View("%TIN")
+                          TableScan: part projection=[p_partkey, p_mfgr, 
p_type, p_size], partial_filters=[part.p_size = Int32(48), part.p_type LIKE 
Utf8View("%TIN")]
                       TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_supplycost]
                   TableScan: supplier projection=[s_suppkey, s_name, 
s_address, s_nationkey, s_phone, s_acctbal, s_comment]
               TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
           Projection: region.r_regionkey
-            Filter: region.r_name = Utf8("ASIA")
-              TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("ASIA")]
+            Filter: region.r_name = Utf8View("ASIA")
+              TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("ASIA")]
       SubqueryAlias: __scalar_sq_1
         Projection: min(partsupp.ps_supplycost), partsupp.ps_partkey
           Aggregate: groupBy=[[partsupp.ps_partkey]], 
aggr=[[min(partsupp.ps_supplycost)]]
@@ -34,14 +34,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name 
ASC NULLS LAST, supplie
                         TableScan: supplier projection=[s_suppkey, s_nationkey]
                     TableScan: nation projection=[n_nationkey, n_regionkey]
                 Projection: region.r_regionkey
-                  Filter: region.r_name = Utf8("ASIA")
-                    TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("ASIA")]
+                  Filter: region.r_name = Utf8View("ASIA")
+                    TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("ASIA")]
 
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 
ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=100
-  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS 
LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], 
preserve_partitioning=[true]
+SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 
ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
+  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, 
s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], 
preserve_partitioning=[true]
     ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, 
n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 
as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
       CoalesceBatchesExec: target_batch_size=8192
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], 
projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, 
s_acctbal@5, s_comment@6, n_name@8]
@@ -51,9 +51,9 @@ SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS 
LAST,s_name@1 ASC
                 HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2, 
s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8, 
n_name@9]
                   CoalesceBatchesExec: target_batch_size=8192
                     RepartitionExec: partitioning=Hash([r_regionkey@0], 2), 
input_partitions=2
-                      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                        CoalesceBatchesExec: target_batch_size=8192
-                          FilterExec: r_name@1 = ASIA, 
projection=[r_regionkey@0]
+                      CoalesceBatchesExec: target_batch_size=8192
+                        FilterExec: r_name@1 = ASIA, projection=[r_regionkey@0]
+                          RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                             ParquetExec: file_groups={ ... }, 
projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA, 
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false 
ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END, 
required_guarantees=[r_name in (ASIA)]
                   CoalesceBatchesExec: target_batch_size=8192
                     RepartitionExec: partitioning=Hash([n_regionkey@9], 2), 
input_partitions=2
@@ -96,9 +96,9 @@ SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS 
LAST,s_name@1 ASC
                           HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
                             CoalesceBatchesExec: target_batch_size=8192
                               RepartitionExec: 
partitioning=Hash([r_regionkey@0], 2), input_partitions=2
-                                RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
-                                  CoalesceBatchesExec: target_batch_size=8192
-                                    FilterExec: r_name@1 = ASIA, 
projection=[r_regionkey@0]
+                                CoalesceBatchesExec: target_batch_size=8192
+                                  FilterExec: r_name@1 = ASIA, 
projection=[r_regionkey@0]
+                                    RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
                                       ParquetExec: file_groups={ ... }, 
projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA, 
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false 
ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END, 
required_guarantees=[r_name in (ASIA)]
                             CoalesceBatchesExec: target_batch_size=8192
                               RepartitionExec: 
partitioning=Hash([n_regionkey@2], 2), input_partitions=2
@@ -243,7 +243,7 @@ ShuffleWriterExec(stage_id=16, 
output_partitioning=Hash([Column { name: "ps_part
 
 Query Stage #17 (2 -> 2):
 ShuffleWriterExec(stage_id=17, output_partitioning=Hash([Column { name: 
"p_partkey", index: 3 }], 2))
-  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS 
LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], 
preserve_partitioning=[true]
+  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, 
s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], 
preserve_partitioning=[true]
     ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, 
n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 
as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
       CoalesceBatchesExec: target_batch_size=8192
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, 
ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], 
projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, 
s_acctbal@5, s_comment@6, n_name@8]
@@ -253,6 +253,6 @@ ShuffleWriterExec(stage_id=17, 
output_partitioning=Hash([Column { name: "p_partk
             ShuffleReaderExec(stage_id=16, input_partitioning=Hash([Column { 
name: "ps_partkey", index: 1 }, Column { name: "min(partsupp.ps_supplycost)", 
index: 0 }], 2))
 
 Query Stage #18 (2 -> 1):
-SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 
ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=100
+SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 
ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
   ShuffleReaderExec(stage_id=17, input_partitioning=Hash([Column { name: 
"p_partkey", index: 3 }], 2))
 
diff --git a/testdata/expected-plans/q20.txt b/testdata/expected-plans/q20.txt
index 5473093..13b21c8 100644
--- a/testdata/expected-plans/q20.txt
+++ b/testdata/expected-plans/q20.txt
@@ -3,22 +3,22 @@ DataFusion Logical Plan
 
 Sort: supplier.s_name ASC NULLS LAST
   Projection: supplier.s_name, supplier.s_address
-    LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
+    LeftSemi Join: supplier.s_suppkey = __correlated_sq_2.ps_suppkey
       Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address
         Inner Join: supplier.s_nationkey = nation.n_nationkey
           TableScan: supplier projection=[s_suppkey, s_name, s_address, 
s_nationkey]
           Projection: nation.n_nationkey
-            Filter: nation.n_name = Utf8("KENYA")
-              TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("KENYA")]
-      SubqueryAlias: __correlated_sq_1
+            Filter: nation.n_name = Utf8View("KENYA")
+              TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("KENYA")]
+      SubqueryAlias: __correlated_sq_2
         Projection: partsupp.ps_suppkey
           Inner Join: partsupp.ps_partkey = __scalar_sq_3.l_partkey, 
partsupp.ps_suppkey = __scalar_sq_3.l_suppkey Filter: CAST(partsupp.ps_availqty 
AS Float64) > __scalar_sq_3.Float64(0.5) * sum(lineitem.l_quantity)
-            LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
+            LeftSemi Join: partsupp.ps_partkey = __correlated_sq_1.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty]
-              SubqueryAlias: __correlated_sq_2
+              SubqueryAlias: __correlated_sq_1
                 Projection: part.p_partkey
-                  Filter: part.p_name LIKE Utf8("blanched%")
-                    TableScan: part projection=[p_partkey, p_name], 
partial_filters=[part.p_name LIKE Utf8("blanched%")]
+                  Filter: part.p_name LIKE Utf8View("blanched%")
+                    TableScan: part projection=[p_partkey, p_name], 
partial_filters=[part.p_name LIKE Utf8View("blanched%")]
             SubqueryAlias: __scalar_sq_3
               Projection: Float64(0.5) * CAST(sum(lineitem.l_quantity) AS 
Float64), lineitem.l_partkey, lineitem.l_suppkey
                 Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], 
aggr=[[sum(lineitem.l_quantity)]]
@@ -39,9 +39,9 @@ SortPreservingMergeExec: [s_name@0 ASC NULLS LAST]
               HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, s_nationkey@3)], projection=[s_suppkey@1, s_name@2, 
s_address@3]
                 CoalesceBatchesExec: target_batch_size=8192
                   RepartitionExec: partitioning=Hash([n_nationkey@0], 2), 
input_partitions=2
-                    RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                      CoalesceBatchesExec: target_batch_size=8192
-                        FilterExec: n_name@1 = KENYA, 
projection=[n_nationkey@0]
+                    CoalesceBatchesExec: target_batch_size=8192
+                      FilterExec: n_name@1 = KENYA, projection=[n_nationkey@0]
+                        RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                           ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = KENYA, 
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false 
ELSE n_name_min@0 <= KENYA AND KENYA <= n_name_max@1 END, 
required_guarantees=[n_name in (KENYA)]
                 CoalesceBatchesExec: target_batch_size=8192
                   RepartitionExec: partitioning=Hash([s_nationkey@3], 2), 
input_partitions=2
diff --git a/testdata/expected-plans/q21.txt b/testdata/expected-plans/q21.txt
index dbd5e97..b88bccc 100644
--- a/testdata/expected-plans/q21.txt
+++ b/testdata/expected-plans/q21.txt
@@ -19,11 +19,11 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS 
LAST, fetch=100
                             Filter: lineitem.l_receiptdate > 
lineitem.l_commitdate
                               TableScan: lineitem projection=[l_orderkey, 
l_suppkey, l_commitdate, l_receiptdate], 
partial_filters=[lineitem.l_receiptdate > lineitem.l_commitdate]
                     Projection: orders.o_orderkey
-                      Filter: orders.o_orderstatus = Utf8("F")
-                        TableScan: orders projection=[o_orderkey, 
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8("F")]
+                      Filter: orders.o_orderstatus = Utf8View("F")
+                        TableScan: orders projection=[o_orderkey, 
o_orderstatus], partial_filters=[orders.o_orderstatus = Utf8View("F")]
                 Projection: nation.n_nationkey
-                  Filter: nation.n_name = Utf8("ARGENTINA")
-                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("ARGENTINA")]
+                  Filter: nation.n_name = Utf8View("ARGENTINA")
+                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("ARGENTINA")]
             SubqueryAlias: __correlated_sq_1
               SubqueryAlias: l2
                 TableScan: lineitem projection=[l_orderkey, l_suppkey]
@@ -36,8 +36,8 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS 
LAST, fetch=100
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100
-  SortExec: TopK(fetch=100), expr=[numwait@1 DESC,s_name@0 ASC NULLS LAST], 
preserve_partitioning=[true]
+SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST], fetch=100
+  SortExec: TopK(fetch=100), expr=[numwait@1 DESC, s_name@0 ASC NULLS LAST], 
preserve_partitioning=[true]
     ProjectionExec: expr=[s_name@0 as s_name, count(*)@1 as numwait]
       AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name], 
aggr=[count(*)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -53,9 +53,9 @@ SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS 
LAST], fetch=100
                             HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, s_nationkey@1)], projection=[s_name@1, l_orderkey@3, 
l_suppkey@4]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([n_nationkey@0], 2), input_partitions=2
-                                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
-                                    CoalesceBatchesExec: target_batch_size=8192
-                                      FilterExec: n_name@1 = ARGENTINA, 
projection=[n_nationkey@0]
+                                  CoalesceBatchesExec: target_batch_size=8192
+                                    FilterExec: n_name@1 = ARGENTINA, 
projection=[n_nationkey@0]
+                                      RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
                                         ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = ARGENTINA, 
pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false 
ELSE n_name_min@0 <= ARGENTINA AND ARGENTINA <= n_name_max@1 END, 
required_guarantees=[n_name in (ARGENTINA)]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([s_nationkey@1], 2), input_partitions=2
@@ -166,13 +166,13 @@ ShuffleWriterExec(stage_id=9, 
output_partitioning=Hash([Column { name: "s_name",
 
 Query Stage #10 (2 -> 2):
 ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: 
"s_name", index: 0 }], 2))
-  SortExec: TopK(fetch=100), expr=[numwait@1 DESC,s_name@0 ASC NULLS LAST], 
preserve_partitioning=[true]
+  SortExec: TopK(fetch=100), expr=[numwait@1 DESC, s_name@0 ASC NULLS LAST], 
preserve_partitioning=[true]
     ProjectionExec: expr=[s_name@0 as s_name, count(*)@1 as numwait]
       AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name], 
aggr=[count(*)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { 
name: "s_name", index: 0 }], 2))
 
 Query Stage #11 (2 -> 1):
-SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100
+SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST], fetch=100
   ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: 
"s_name", index: 0 }], 2))
 
diff --git a/testdata/expected-plans/q22.txt b/testdata/expected-plans/q22.txt
index d46d5d5..da693fb 100644
--- a/testdata/expected-plans/q22.txt
+++ b/testdata/expected-plans/q22.txt
@@ -9,15 +9,15 @@ Sort: custsale.cntrycode ASC NULLS LAST
           Inner Join:  Filter: CAST(customer.c_acctbal AS Decimal128(15, 6)) > 
__scalar_sq_2.avg(customer.c_acctbal)
             Projection: customer.c_phone, customer.c_acctbal
               LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey
-                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"), 
Utf8("13")])
-                  TableScan: customer projection=[c_custkey, c_phone, 
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"), 
Utf8("13")])]
+                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8View("24"), Utf8View("34"), Utf8View("16"), Utf8View("30"), 
Utf8View("33"), Utf8View("14"), Utf8View("13")])
+                  TableScan: customer projection=[c_custkey, c_phone, 
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8View("24"), Utf8View("34"), Utf8View("16"), Utf8View("30"), 
Utf8View("33"), Utf8View("14"), Utf8View("13")])]
                 SubqueryAlias: __correlated_sq_1
                   TableScan: orders projection=[o_custkey]
             SubqueryAlias: __scalar_sq_2
               Aggregate: groupBy=[[]], aggr=[[avg(customer.c_acctbal)]]
                 Projection: customer.c_acctbal
-                  Filter: customer.c_acctbal > Decimal128(Some(0),11,2) AND 
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("24"), Utf8("34"), 
Utf8("16"), Utf8("30"), Utf8("33"), Utf8("14"), Utf8("13")])
-                    TableScan: customer projection=[c_phone, c_acctbal], 
partial_filters=[customer.c_acctbal > Decimal128(Some(0),11,2) AS 
customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, 
Int64(1), Int64(2)) IN ([Utf8("24"), Utf8("34"), Utf8("16"), Utf8("30"), 
Utf8("33"), Utf8("14"), Utf8("13")]), customer.c_acctbal > 
Decimal128(Some(0),11,2)]
+                  Filter: customer.c_acctbal > Decimal128(Some(0),11,2) AND 
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("24"), 
Utf8View("34"), Utf8View("16"), Utf8View("30"), Utf8View("33"), Utf8View("14"), 
Utf8View("13")])
+                    TableScan: customer projection=[c_phone, c_acctbal], 
partial_filters=[customer.c_acctbal > Decimal128(Some(0),11,2), 
substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("24"), 
Utf8View("34"), Utf8View("16"), Utf8View("30"), Utf8View("33"), Utf8View("14"), 
Utf8View("13")])]
 
 DataFusion Physical Plan
 ========================
@@ -35,14 +35,14 @@ SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST]
                     CoalescePartitionsExec
                       AggregateExec: mode=Partial, gby=[], 
aggr=[avg(customer.c_acctbal)]
                         CoalesceBatchesExec: target_batch_size=8192
-                          FilterExec: c_acctbal@1 > Some(0),11,2 AND Use 
substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("24") }, Literal { 
value: Utf8("34") }, Literal { value: Utf8("16") }, Literal { value: Utf8("30") 
}, Literal { value: Utf8("33") }, Literal { value: Utf8("14") }, Literal { 
value: Utf8("13") }]), projection=[c_acctbal@1]
-                            ParquetExec: file_groups={ ... }]) AND c_acctbal@5 
> Some(0),11,2, pruning_predicate=CASE WHEN c_acctbal_null_count@1 = 
c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0 > Some(0),11,2 END AND 
CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE 
c_acctbal_max@0 > Some(0),11,2 END, required_guarantees=[]
+                          FilterExec: c_acctbal@1 > Some(0),11,2 AND 
substr(c_phone@0, 1, 2) IN ([Literal { value: Utf8View("24") }, Literal { 
value: Utf8View("34") }, Literal { value: Utf8View("16") }, Literal { value: 
Utf8View("30") }, Literal { value: Utf8View("33") }, Literal { value: 
Utf8View("14") }, Literal { value: Utf8View("13") }]), projection=[c_acctbal@1]
+                            ParquetExec: file_groups={ ... }]), 
pruning_predicate=CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN 
false ELSE c_acctbal_max@0 > Some(0),11,2 END, required_guarantees=[]
                   CoalesceBatchesExec: target_batch_size=8192
                     HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
                       CoalesceBatchesExec: target_batch_size=8192
                         RepartitionExec: partitioning=Hash([c_custkey@0], 2), 
input_partitions=2
                           CoalesceBatchesExec: target_batch_size=8192
-                            FilterExec: Use substr(c_phone@1, 1, 2) IN (SET) 
([Literal { value: Utf8("24") }, Literal { value: Utf8("34") }, Literal { 
value: Utf8("16") }, Literal { value: Utf8("30") }, Literal { value: Utf8("33") 
}, Literal { value: Utf8("14") }, Literal { value: Utf8("13") }])
+                            FilterExec: substr(c_phone@1, 1, 2) IN ([Literal { 
value: Utf8View("24") }, Literal { value: Utf8View("34") }, Literal { value: 
Utf8View("16") }, Literal { value: Utf8View("30") }, Literal { value: 
Utf8View("33") }, Literal { value: Utf8View("14") }, Literal { value: 
Utf8View("13") }])
                               ParquetExec: file_groups={ ... }])
                       CoalesceBatchesExec: target_batch_size=8192
                         RepartitionExec: partitioning=Hash([o_custkey@0], 2), 
input_partitions=2
@@ -55,13 +55,13 @@ Query Stage #0 (2 -> 1):
 ShuffleWriterExec(stage_id=0, output_partitioning=UnknownPartitioning(2))
   AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)]
     CoalesceBatchesExec: target_batch_size=8192
-      FilterExec: c_acctbal@1 > Some(0),11,2 AND Use substr(c_phone@0, 1, 2) 
IN (SET) ([Literal { value: Utf8("24") }, Literal { value: Utf8("34") }, 
Literal { value: Utf8("16") }, Literal { value: Utf8("30") }, Literal { value: 
Utf8("33") }, Literal { value: Utf8("14") }, Literal { value: Utf8("13") }]), 
projection=[c_acctbal@1]
-        ParquetExec: file_groups={ ... }]) AND c_acctbal@5 > Some(0),11,2, 
pruning_predicate=CASE WHEN c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN 
false ELSE c_acctbal_max@0 > Some(0),11,2 END AND CASE WHEN 
c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0 
> Some(0),11,2 END, required_guarantees=[]
+      FilterExec: c_acctbal@1 > Some(0),11,2 AND substr(c_phone@0, 1, 2) IN 
([Literal { value: Utf8View("24") }, Literal { value: Utf8View("34") }, Literal 
{ value: Utf8View("16") }, Literal { value: Utf8View("30") }, Literal { value: 
Utf8View("33") }, Literal { value: Utf8View("14") }, Literal { value: 
Utf8View("13") }]), projection=[c_acctbal@1]
+        ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN 
c_acctbal_null_count@1 = c_acctbal_row_count@2 THEN false ELSE c_acctbal_max@0 
> Some(0),11,2 END, required_guarantees=[]
 
 Query Stage #1 (2 -> 2):
 ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: 
"c_custkey", index: 0 }], 2))
   CoalesceBatchesExec: target_batch_size=8192
-    FilterExec: Use substr(c_phone@1, 1, 2) IN (SET) ([Literal { value: 
Utf8("24") }, Literal { value: Utf8("34") }, Literal { value: Utf8("16") }, 
Literal { value: Utf8("30") }, Literal { value: Utf8("33") }, Literal { value: 
Utf8("14") }, Literal { value: Utf8("13") }])
+    FilterExec: substr(c_phone@1, 1, 2) IN ([Literal { value: Utf8View("24") 
}, Literal { value: Utf8View("34") }, Literal { value: Utf8View("16") }, 
Literal { value: Utf8View("30") }, Literal { value: Utf8View("33") }, Literal { 
value: Utf8View("14") }, Literal { value: Utf8View("13") }])
       ParquetExec: file_groups={ ... }])
 
 Query Stage #2 (2 -> 2):
diff --git a/testdata/expected-plans/q3.txt b/testdata/expected-plans/q3.txt
index 6fd8791..f9039d3 100644
--- a/testdata/expected-plans/q3.txt
+++ b/testdata/expected-plans/q3.txt
@@ -9,8 +9,8 @@ Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS 
LAST, fetch=10
           Projection: orders.o_orderkey, orders.o_orderdate, 
orders.o_shippriority
             Inner Join: customer.c_custkey = orders.o_custkey
               Projection: customer.c_custkey
-                Filter: customer.c_mktsegment = Utf8("BUILDING")
-                  TableScan: customer projection=[c_custkey, c_mktsegment], 
partial_filters=[customer.c_mktsegment = Utf8("BUILDING")]
+                Filter: customer.c_mktsegment = Utf8View("BUILDING")
+                  TableScan: customer projection=[c_custkey, c_mktsegment], 
partial_filters=[customer.c_mktsegment = Utf8View("BUILDING")]
               Filter: orders.o_orderdate < Date32("1995-03-15")
                 TableScan: orders projection=[o_orderkey, o_custkey, 
o_orderdate, o_shippriority], partial_filters=[orders.o_orderdate < 
Date32("1995-03-15")]
           Projection: lineitem.l_orderkey, lineitem.l_extendedprice, 
lineitem.l_discount
@@ -20,8 +20,8 @@ Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS 
LAST, fetch=10
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST], 
fetch=10
-  SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS 
LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], 
fetch=10
+  SortExec: TopK(fetch=10), expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS 
LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, 
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
       AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, 
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], 
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -91,13 +91,13 @@ ShuffleWriterExec(stage_id=4, 
output_partitioning=Hash([Column { name: "l_orderk
 
 Query Stage #5 (2 -> 2):
 ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: 
"l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { 
name: "o_shippriority", index: 3 }], 2))
-  SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS 
LAST], preserve_partitioning=[true]
+  SortExec: TopK(fetch=10), expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS 
LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, 
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
       AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, 
o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], 
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column { 
name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, 
Column { name: "o_shippriority", index: 2 }], 2))
 
 Query Stage #6 (2 -> 1):
-SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST], 
fetch=10
+SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], 
fetch=10
   ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: 
"l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { 
name: "o_shippriority", index: 3 }], 2))
 
diff --git a/testdata/expected-plans/q5.txt b/testdata/expected-plans/q5.txt
index 5351e06..2bacb27 100644
--- a/testdata/expected-plans/q5.txt
+++ b/testdata/expected-plans/q5.txt
@@ -22,8 +22,8 @@ Sort: revenue DESC NULLS FIRST
                   TableScan: supplier projection=[s_suppkey, s_nationkey]
               TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
           Projection: region.r_regionkey
-            Filter: region.r_name = Utf8("AFRICA")
-              TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("AFRICA")]
+            Filter: region.r_name = Utf8View("AFRICA")
+              TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("AFRICA")]
 
 DataFusion Physical Plan
 ========================
@@ -39,9 +39,9 @@ SortPreservingMergeExec: [revenue@1 DESC]
                 HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(r_regionkey@0, n_regionkey@3)], projection=[l_extendedprice@1, 
l_discount@2, n_name@3]
                   CoalesceBatchesExec: target_batch_size=8192
                     RepartitionExec: partitioning=Hash([r_regionkey@0], 2), 
input_partitions=2
-                      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                        CoalesceBatchesExec: target_batch_size=8192
-                          FilterExec: r_name@1 = AFRICA, 
projection=[r_regionkey@0]
+                      CoalesceBatchesExec: target_batch_size=8192
+                        FilterExec: r_name@1 = AFRICA, 
projection=[r_regionkey@0]
+                          RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                             ParquetExec: file_groups={ ... }, 
projection=[r_regionkey, r_name], predicate=r_name@1 = AFRICA, 
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false 
ELSE r_name_min@0 <= AFRICA AND AFRICA <= r_name_max@1 END, 
required_guarantees=[r_name in (AFRICA)]
                   CoalesceBatchesExec: target_batch_size=8192
                     RepartitionExec: partitioning=Hash([n_regionkey@3], 2), 
input_partitions=2
diff --git a/testdata/expected-plans/q7.txt b/testdata/expected-plans/q7.txt
index b9e261a..43bc031 100644
--- a/testdata/expected-plans/q7.txt
+++ b/testdata/expected-plans/q7.txt
@@ -6,7 +6,7 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation 
ASC NULLS LAST,
     Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, 
shipping.l_year]], aggr=[[sum(shipping.volume)]]
       SubqueryAlias: shipping
         Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, 
date_part(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, 
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS 
volume
-          Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name 
= Utf8("GERMANY") AND n2.n_name = Utf8("IRAQ") OR n1.n_name = Utf8("IRAQ") AND 
n2.n_name = Utf8("GERMANY")
+          Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name 
= Utf8View("GERMANY") AND n2.n_name = Utf8View("IRAQ") OR n1.n_name = 
Utf8View("IRAQ") AND n2.n_name = Utf8View("GERMANY")
             Projection: lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_shipdate, customer.c_nationkey, n1.n_name
               Inner Join: supplier.s_nationkey = n1.n_nationkey
                 Projection: supplier.s_nationkey, lineitem.l_extendedprice, 
lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey
@@ -21,17 +21,17 @@ Sort: shipping.supp_nation ASC NULLS LAST, 
shipping.cust_nation ASC NULLS LAST,
                         TableScan: orders projection=[o_orderkey, o_custkey]
                     TableScan: customer projection=[c_custkey, c_nationkey]
                 SubqueryAlias: n1
-                  Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = 
Utf8("IRAQ")
-                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("GERMANY") OR nation.n_name = 
Utf8("IRAQ")]
+                  Filter: nation.n_name = Utf8View("GERMANY") OR nation.n_name 
= Utf8View("IRAQ")
+                    TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("GERMANY") OR nation.n_name = 
Utf8View("IRAQ")]
             SubqueryAlias: n2
-              Filter: nation.n_name = Utf8("IRAQ") OR nation.n_name = 
Utf8("GERMANY")
-                TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("IRAQ") OR nation.n_name = 
Utf8("GERMANY")]
+              Filter: nation.n_name = Utf8View("IRAQ") OR nation.n_name = 
Utf8View("GERMANY")
+                TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8View("IRAQ") OR nation.n_name = 
Utf8View("GERMANY")]
 
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS 
LAST,l_year@2 ASC NULLS LAST]
-  SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS 
LAST,l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
+SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC 
NULLS LAST, l_year@2 ASC NULLS LAST]
+  SortExec: expr=[supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS LAST, 
l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as 
cust_nation, l_year@2 as l_year, sum(shipping.volume)@3 as revenue]
       AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation, 
cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[sum(shipping.volume)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -42,9 +42,9 @@ SortPreservingMergeExec: [supp_nation@0 ASC NULLS 
LAST,cust_nation@1 ASC NULLS L
                   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, c_nationkey@3)], filter=n_name@0 = GERMANY AND n_name@1 = 
IRAQ OR n_name@0 = IRAQ AND n_name@1 = GERMANY, projection=[n_name@1, 
l_extendedprice@2, l_discount@3, l_shipdate@4, n_name@6]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([n_nationkey@0], 2), 
input_partitions=2
-                        RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                          CoalesceBatchesExec: target_batch_size=8192
-                            FilterExec: n_name@1 = IRAQ OR n_name@1 = GERMANY
+                        CoalesceBatchesExec: target_batch_size=8192
+                          FilterExec: n_name@1 = IRAQ OR n_name@1 = GERMANY
+                            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                               ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = IRAQ OR n_name@1 = 
GERMANY, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 
THEN false ELSE n_name_min@0 <= IRAQ AND IRAQ <= n_name_max@1 END OR CASE WHEN 
n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= 
GERMANY AND GERMANY <= n_name_max@1 END, required_guarantees=[n_name in 
(GERMANY, IRAQ)]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([c_nationkey@3], 2), 
input_partitions=2
@@ -53,9 +53,9 @@ SortPreservingMergeExec: [supp_nation@0 ASC NULLS 
LAST,cust_nation@1 ASC NULLS L
                             HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(n_nationkey@0, s_nationkey@0)], projection=[n_name@1, l_extendedprice@3, 
l_discount@4, l_shipdate@5, c_nationkey@6]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([n_nationkey@0], 2), input_partitions=2
-                                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
-                                    CoalesceBatchesExec: target_batch_size=8192
-                                      FilterExec: n_name@1 = GERMANY OR 
n_name@1 = IRAQ
+                                  CoalesceBatchesExec: target_batch_size=8192
+                                    FilterExec: n_name@1 = GERMANY OR n_name@1 
= IRAQ
+                                      RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
                                         ParquetExec: file_groups={ ... }, 
projection=[n_nationkey, n_name], predicate=n_name@1 = GERMANY OR n_name@1 = 
IRAQ, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN 
false ELSE n_name_min@0 <= GERMANY AND GERMANY <= n_name_max@1 END OR CASE WHEN 
n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= IRAQ 
AND IRAQ <= n_name_max@1 END, required_guarantees=[n_name in (GERMANY, IRAQ)]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([s_nationkey@0], 2), input_partitions=2
@@ -170,13 +170,13 @@ ShuffleWriterExec(stage_id=10, 
output_partitioning=Hash([Column { name: "supp_na
 
 Query Stage #11 (2 -> 2):
 ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: 
"supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { 
name: "l_year", index: 2 }], 2))
-  SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS 
LAST,l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
+  SortExec: expr=[supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS LAST, 
l_year@2 ASC NULLS LAST], preserve_partitioning=[true]
     ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as 
cust_nation, l_year@2 as l_year, sum(shipping.volume)@3 as revenue]
       AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation, 
cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[sum(shipping.volume)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { 
name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, 
Column { name: "l_year", index: 2 }], 2))
 
 Query Stage #12 (2 -> 1):
-SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS 
LAST,l_year@2 ASC NULLS LAST]
+SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC 
NULLS LAST, l_year@2 ASC NULLS LAST]
   ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: 
"supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { 
name: "l_year", index: 2 }], 2))
 
diff --git a/testdata/expected-plans/q8.txt b/testdata/expected-plans/q8.txt
index f2333a4..e9f5b91 100644
--- a/testdata/expected-plans/q8.txt
+++ b/testdata/expected-plans/q8.txt
@@ -3,7 +3,7 @@ DataFusion Logical Plan
 
 Sort: all_nations.o_year ASC NULLS LAST
   Projection: all_nations.o_year, sum(CASE WHEN all_nations.nation = 
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END) / 
sum(all_nations.volume) AS mkt_share
-    Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN 
all_nations.nation = Utf8("IRAQ") THEN all_nations.volume ELSE 
Decimal128(Some(0),35,4) END) AS sum(CASE WHEN all_nations.nation = 
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END), 
sum(all_nations.volume)]]
+    Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN 
all_nations.nation = Utf8View("IRAQ") THEN all_nations.volume ELSE 
Decimal128(Some(0),35,4) END) AS sum(CASE WHEN all_nations.nation = 
Utf8("IRAQ") THEN all_nations.volume ELSE Int64(0) END), 
sum(all_nations.volume)]]
       SubqueryAlias: all_nations
         Projection: date_part(Utf8("YEAR"), orders.o_orderdate) AS o_year, 
lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS 
volume, n2.n_name AS nation
           Inner Join: n1.n_regionkey = region.r_regionkey
@@ -20,8 +20,8 @@ Sort: all_nations.o_year ASC NULLS LAST
                                 Projection: lineitem.l_orderkey, 
lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount
                                   Inner Join: part.p_partkey = 
lineitem.l_partkey
                                     Projection: part.p_partkey
-                                      Filter: part.p_type = Utf8("LARGE PLATED 
STEEL")
-                                        TableScan: part projection=[p_partkey, 
p_type], partial_filters=[part.p_type = Utf8("LARGE PLATED STEEL")]
+                                      Filter: part.p_type = Utf8View("LARGE 
PLATED STEEL")
+                                        TableScan: part projection=[p_partkey, 
p_type], partial_filters=[part.p_type = Utf8View("LARGE PLATED STEEL")]
                                     TableScan: lineitem 
projection=[l_orderkey, l_partkey, l_suppkey, l_extendedprice, l_discount]
                                 TableScan: supplier projection=[s_suppkey, 
s_nationkey]
                             Filter: orders.o_orderdate >= Date32("1995-01-01") 
AND orders.o_orderdate <= Date32("1996-12-31")
@@ -32,8 +32,8 @@ Sort: all_nations.o_year ASC NULLS LAST
                 SubqueryAlias: n2
                   TableScan: nation projection=[n_nationkey, n_name]
             Projection: region.r_regionkey
-              Filter: region.r_name = Utf8("MIDDLE EAST")
-                TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8("MIDDLE EAST")]
+              Filter: region.r_name = Utf8View("MIDDLE EAST")
+                TableScan: region projection=[r_regionkey, r_name], 
partial_filters=[region.r_name = Utf8View("MIDDLE EAST")]
 
 DataFusion Physical Plan
 ========================
@@ -50,9 +50,9 @@ SortPreservingMergeExec: [o_year@0 ASC NULLS LAST]
                   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(r_regionkey@0, n_regionkey@3)], projection=[l_extendedprice@1, 
l_discount@2, o_orderdate@3, n_name@5]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([r_regionkey@0], 2), 
input_partitions=2
-                        RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-                          CoalesceBatchesExec: target_batch_size=8192
-                            FilterExec: r_name@1 = MIDDLE EAST, 
projection=[r_regionkey@0]
+                        CoalesceBatchesExec: target_batch_size=8192
+                          FilterExec: r_name@1 = MIDDLE EAST, 
projection=[r_regionkey@0]
+                            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                               ParquetExec: file_groups={ ... }, 
projection=[r_regionkey, r_name], predicate=r_name@1 = MIDDLE EAST, 
pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false 
ELSE r_name_min@0 <= MIDDLE EAST AND MIDDLE EAST <= r_name_max@1 END, 
required_guarantees=[r_name in (MIDDLE EAST)]
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([n_regionkey@3], 2), 
input_partitions=2
diff --git a/testdata/expected-plans/q9.txt b/testdata/expected-plans/q9.txt
index 8f738f4..2c713b3 100644
--- a/testdata/expected-plans/q9.txt
+++ b/testdata/expected-plans/q9.txt
@@ -16,8 +16,8 @@ Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS 
FIRST
                         Projection: lineitem.l_orderkey, lineitem.l_partkey, 
lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, 
lineitem.l_discount
                           Inner Join: part.p_partkey = lineitem.l_partkey
                             Projection: part.p_partkey
-                              Filter: part.p_name LIKE Utf8("%moccasin%")
-                                TableScan: part projection=[p_partkey, 
p_name], partial_filters=[part.p_name LIKE Utf8("%moccasin%")]
+                              Filter: part.p_name LIKE Utf8View("%moccasin%")
+                                TableScan: part projection=[p_partkey, 
p_name], partial_filters=[part.p_name LIKE Utf8View("%moccasin%")]
                             TableScan: lineitem projection=[l_orderkey, 
l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount]
                         TableScan: supplier projection=[s_suppkey, s_nationkey]
                     TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_supplycost]
@@ -27,8 +27,8 @@ Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS 
FIRST
 DataFusion Physical Plan
 ========================
 
-SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]
-  SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC], 
preserve_partitioning=[true]
+SortPreservingMergeExec: [nation@0 ASC NULLS LAST, o_year@1 DESC]
+  SortExec: expr=[nation@0 ASC NULLS LAST, o_year@1 DESC], 
preserve_partitioning=[true]
     ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, 
sum(profit.amount)@2 as sum_profit]
       AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 
as o_year], aggr=[sum(profit.amount)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -160,13 +160,13 @@ ShuffleWriterExec(stage_id=10, 
output_partitioning=Hash([Column { name: "nation"
 
 Query Stage #11 (2 -> 2):
 ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: 
"nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
-  SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC], 
preserve_partitioning=[true]
+  SortExec: expr=[nation@0 ASC NULLS LAST, o_year@1 DESC], 
preserve_partitioning=[true]
     ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, 
sum(profit.amount)@2 as sum_profit]
       AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 
as o_year], aggr=[sum(profit.amount)]
         CoalesceBatchesExec: target_batch_size=8192
           ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { 
name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
 
 Query Stage #12 (2 -> 1):
-SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]
+SortPreservingMergeExec: [nation@0 ASC NULLS LAST, o_year@1 DESC]
   ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: 
"nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))
 
diff --git a/tests/test_context.py b/tests/test_context.py
new file mode 100644
index 0000000..ecc3324
--- /dev/null
+++ b/tests/test_context.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion_ray.context import DatafusionRayContext
+from datafusion import SessionContext, SessionConfig, RuntimeConfig, col, lit, 
functions as F
+
+
+def test_basic_query_succeed():
+    df_ctx = SessionContext()
+    ctx = DatafusionRayContext(df_ctx)
+    df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+    # TODO why does this return a single batch and not a list of batches?
+    record_batches = ctx.sql("SELECT * FROM tips")
+    assert record_batches[0].num_rows == 244
+
+def test_aggregate_csv():
+    df_ctx = SessionContext()
+    ctx = DatafusionRayContext(df_ctx)
+    df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+    record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as 
tip_pct from tips group by sex, smoker")
+    assert isinstance(record_batches, list)
+    # TODO why does this return many empty batches?
+    num_rows = 0
+    for record_batch in record_batches:
+        num_rows += record_batch.num_rows
+    assert num_rows == 4
+
+def test_aggregate_parquet():
+    df_ctx = SessionContext()
+    ctx = DatafusionRayContext(df_ctx)
+    df_ctx.register_parquet("tips", "examples/tips.parquet")
+    record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as 
tip_pct from tips group by sex, smoker")
+    # TODO why does this return many empty batches?
+    num_rows = 0
+    for record_batch in record_batches:
+        num_rows += record_batch.num_rows
+    assert num_rows == 4
+
+def test_aggregate_parquet_dataframe():
+    df_ctx = SessionContext()
+    ray_ctx = DatafusionRayContext(df_ctx)
+    df = df_ctx.read_parquet(f"examples/tips.parquet")
+    df = (
+        df.aggregate(
+            [col("sex"), col("smoker"), col("day"), col("time")],
+            [F.avg(col("tip") / col("total_bill")).alias("tip_pct")],
+        )
+        .filter(col("day") != lit("Dinner"))
+        .aggregate([col("sex"), col("smoker")], 
[F.avg(col("tip_pct")).alias("avg_pct")])
+    )
+    ray_results = ray_ctx.plan(df.execution_plan())
+    df_ctx.create_dataframe([ray_results]).show()
+
+
+def test_no_result_query():
+    df_ctx = SessionContext()
+    ctx = DatafusionRayContext(df_ctx)
+    df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
+    ctx.sql("CREATE VIEW tips_view AS SELECT * FROM tips")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to