This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d2c909f chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
5d2c909f is described below

commit 5d2c909f59caf90348096511d4a26e48a0ae8e3d
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 28 15:14:11 2024 -0700

    chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
    
    * move aggregate expressions to spark-expr crate
    
    * move more expressions
    
    * move benchmark
    
    * normalize_nan
    
    * bitwise not
    
    * comet scalar funcs
    
    * update bench imports
    
    * save
    
    * save
    
    * save
    
    * remove unused imports
    
    * clippy
    
    * implement more hashers
    
    * implement Hash and PartialEq
    
    * implement Hash and PartialEq
    
    * implement Hash and PartialEq
    
    * benches
    
    * fix ScalarUDFImpl.return_type failure
    
    * exclude test from miri
    
    * ignore correct test
    
    * ignore another test
    
    * remove miri checks
    
    * use return_type_from_exprs
    
    * Revert "use return_type_from_exprs"
    
    This reverts commit febc1f1ec1301f9b359fc23ad6a117224fce35b7.
    
    * use DF main branch
    
    * hacky workaround for regression in ScalarUDFImpl.return_type
    
    * fix repo url
    
    * pin to revision
    
    * bump to latest rev
    
    * bump to latest DF rev
    
    * bump DF to rev 9f530dd
    
    * add Cargo.lock
    
    * bump DF version
    
    * no default features
    
    * Revert "remove miri checks"
    
    This reverts commit 4638fe3aa5501966cd5d8b53acf26c698b10b3c9.
    
    * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930
    
    * update pin
    
    * Update Cargo.toml
    
    Bump to 44.0.0-rc2
    
    * update cargo lock
    
    * revert miri change
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 native/Cargo.lock                                  | 598 ++++++++++-----------
 native/Cargo.toml                                  |  29 +-
 .../expressions/bloom_filter_might_contain.rs      |  46 +-
 native/core/src/execution/expressions/subquery.rs  |  23 +-
 native/core/src/execution/jni_api.rs               |   9 +-
 native/core/src/execution/operators/copy.rs        |   4 +-
 native/core/src/execution/operators/expand.rs      |   6 +-
 native/core/src/execution/operators/filter.rs      |   3 +-
 native/core/src/execution/operators/scan.rs        |   4 +-
 native/core/src/execution/planner.rs               |  55 +-
 .../core/src/execution/shuffle/shuffle_writer.rs   |   6 +-
 native/core/src/execution/util/spark_bit_array.rs  |   2 +-
 .../core/src/execution/util/spark_bloom_filter.rs  |   2 +-
 native/spark-expr/Cargo.toml                       |   1 +
 native/spark-expr/benches/aggregate.rs             |   3 +-
 native/spark-expr/src/avg.rs                       |  22 +-
 native/spark-expr/src/avg_decimal.rs               |  20 +-
 native/spark-expr/src/bitwise_not.rs               |  38 +-
 native/spark-expr/src/cast.rs                      |  42 +-
 native/spark-expr/src/checkoverflow.rs             |  53 +-
 native/spark-expr/src/correlation.rs               |  29 +-
 native/spark-expr/src/covariance.rs                |  26 +-
 native/spark-expr/src/if_expr.rs                   |  49 +-
 native/spark-expr/src/list.rs                      | 127 ++---
 native/spark-expr/src/negative.rs                  |  37 +-
 native/spark-expr/src/normalize_nan.rs             |  45 +-
 native/spark-expr/src/regexp.rs                    |  27 +-
 native/spark-expr/src/stddev.rs                    |  30 +-
 native/spark-expr/src/strings.rs                   |  89 ++-
 native/spark-expr/src/structs.rs                   |  59 +-
 native/spark-expr/src/sum_decimal.rs               |  28 +-
 native/spark-expr/src/temporal.rs                  | 175 +++---
 native/spark-expr/src/to_json.rs                   |  23 +-
 native/spark-expr/src/unbound.rs                   |  21 +-
 native/spark-expr/src/variance.rs                  |  19 +-
 35 files changed, 715 insertions(+), 1035 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index ad572acb..bbc0ff97 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -1,6 +1,6 @@
 # This file is automatically @generated by Cargo.
 # It is not intended for manual editing.
-version = 3
+version = 4
 
 [[package]]
 name = "addr2line"
@@ -57,9 +57,9 @@ dependencies = [
 
 [[package]]
 name = "allocator-api2"
-version = "0.2.20"
+version = "0.2.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9"
+checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
 
 [[package]]
 name = "android-tzdata"
@@ -90,9 +90,9 @@ checksum = 
"55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
 
 [[package]]
 name = "anyhow"
-version = "1.0.93"
+version = "1.0.95"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775"
+checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
 
 [[package]]
 name = "arc-swap"
@@ -114,9 +114,9 @@ checksum = 
"7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
 
 [[package]]
 name = "arrow"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4caf25cdc4a985f91df42ed9e9308e1adbcd341a31a72605c697033fcef163e3"
+checksum = "c91839b07e474b3995035fd8ac33ee54f9c9ccbbb1ea33d9909c71bffdf1259d"
 dependencies = [
  "arrow-arith",
  "arrow-array",
@@ -135,9 +135,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-arith"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a"
+checksum = "855c57c4efd26722b044dcd3e348252560e3e0333087fb9f6479dc0bf744054f"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -150,9 +150,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-array"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8"
+checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f"
 dependencies = [
  "ahash",
  "arrow-buffer",
@@ -161,15 +161,15 @@ dependencies = [
  "chrono",
  "chrono-tz 0.10.0",
  "half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
  "num",
 ]
 
 [[package]]
 name = "arrow-buffer"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec"
+checksum = "9e4a9b9b1d6d7117f6138e13bc4dd5daa7f94e671b70e8c9c4dc37b4f5ecfc16"
 dependencies = [
  "bytes",
  "half",
@@ -178,9 +178,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-cast"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1"
+checksum = "bc70e39916e60c5b7af7a8e2719e3ae589326039e1e863675a008bee5ffe90fd"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -199,9 +199,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-csv"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c07b5232be87d115fde73e32f2ca7f1b353bff1b44ac422d3c6fc6ae38f11f0d"
+checksum = "789b2af43c1049b03a8d088ff6b2257cdcea1756cd76b174b1f2600356771b97"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -218,9 +218,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-data"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5"
+checksum = "e4e75edf21ffd53744a9b8e3ed11101f610e7ceb1a29860432824f1834a1f623"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -230,9 +230,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ipc"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee"
+checksum = "d186a909dece9160bf8312f5124d797884f608ef5435a36d9d608e0b2a9bcbf8"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -245,9 +245,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-json"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0471f51260a5309307e5d409c9dc70aede1cd9cf1d4ff0f0a1e8e1a2dd0e0d3c"
+checksum = "b66ff2fedc1222942d0bd2fd391cb14a85baa3857be95c9373179bd616753b85"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -265,9 +265,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ord"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52"
+checksum = "ece7b5bc1180e6d82d1a60e1688c199829e8842e38497563c3ab6ea813e527fd"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -280,9 +280,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-row"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "552907e8e587a6fde4f8843fd7a27a576a260f65dab6c065741ea79f633fc5be"
+checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -294,18 +294,18 @@ dependencies = [
 
 [[package]]
 name = "arrow-schema"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1"
+checksum = "b95513080e728e4cec37f1ff5af4f12c9688d47795d17cda80b6ec2cf74d4678"
 dependencies = [
  "bitflags 2.6.0",
 ]
 
 [[package]]
 name = "arrow-select"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125"
+checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -317,9 +317,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-string"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c"
+checksum = "11d956cae7002eb8d83a27dbd34daaea1cf5b75852f0b84deb4d93a276e92bbf"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -346,7 +346,7 @@ checksum = 
"721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -385,6 +385,19 @@ version = "0.22.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
 
+[[package]]
+name = "bigdecimal"
+version = "0.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7f31f3af01c5c65a07985c804d3366560e6fa7883d640a122819b14ec327482c"
+dependencies = [
+ "autocfg",
+ "libm",
+ "num-bigint",
+ "num-integer",
+ "num-traits",
+]
+
 [[package]]
 name = "bitflags"
 version = "1.3.2"
@@ -408,9 +421,9 @@ dependencies = [
 
 [[package]]
 name = "blake3"
-version = "1.5.4"
+version = "1.5.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7"
+checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e"
 dependencies = [
  "arrayref",
  "arrayvec",
@@ -457,9 +470,9 @@ checksum = 
"79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
 
 [[package]]
 name = "bytemuck"
-version = "1.19.0"
+version = "1.21.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d"
+checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3"
 
 [[package]]
 name = "byteorder"
@@ -469,9 +482,9 @@ checksum = 
"1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
 
 [[package]]
 name = "bytes"
-version = "1.8.0"
+version = "1.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
+checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
 
 [[package]]
 name = "cast"
@@ -481,9 +494,9 @@ checksum = 
"37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
 
 [[package]]
 name = "cc"
-version = "1.2.1"
+version = "1.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47"
+checksum = "8d6dbb628b8f8555f86d0323c2eb39e3ec81901f4b83e091db8a6a76d316a333"
 dependencies = [
  "jobserver",
  "libc",
@@ -504,9 +517,9 @@ checksum = 
"baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.38"
+version = "0.4.39"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
+checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
@@ -586,18 +599,18 @@ dependencies = [
 
 [[package]]
 name = "clap"
-version = "4.5.21"
+version = "4.5.23"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f"
+checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84"
 dependencies = [
  "clap_builder",
 ]
 
 [[package]]
 name = "clap_builder"
-version = "4.5.21"
+version = "4.5.23"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec"
+checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838"
 dependencies = [
  "anstyle",
  "clap_lex",
@@ -605,9 +618,9 @@ dependencies = [
 
 [[package]]
 name = "clap_lex"
-version = "0.7.3"
+version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
+checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
 
 [[package]]
 name = "combine"
@@ -673,9 +686,9 @@ dependencies = [
 
 [[package]]
 name = "cpufeatures"
-version = "0.2.15"
+version = "0.2.16"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6"
+checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3"
 dependencies = [
  "libc",
 ]
@@ -729,9 +742,9 @@ dependencies = [
 
 [[package]]
 name = "crossbeam-deque"
-version = "0.8.5"
+version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
+checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
 dependencies = [
  "crossbeam-epoch",
  "crossbeam-utils",
@@ -748,9 +761,9 @@ dependencies = [
 
 [[package]]
 name = "crossbeam-utils"
-version = "0.8.20"
+version = "0.8.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
+checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
 
 [[package]]
 name = "crunchy"
@@ -805,11 +818,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
- "ahash",
  "arrow",
  "arrow-array",
  "arrow-ipc",
@@ -825,6 +836,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-functions",
  "datafusion-functions-aggregate",
+ "datafusion-functions-table",
  "datafusion-functions-window",
  "datafusion-optimizer",
  "datafusion-physical-expr",
@@ -834,18 +846,13 @@ dependencies = [
  "datafusion-sql",
  "futures",
  "glob",
- "half",
- "hashbrown 0.14.5",
- "indexmap",
  "itertools 0.13.0",
  "log",
- "num_cpus",
  "object_store",
  "parking_lot",
  "parquet",
- "paste",
- "pin-project-lite",
  "rand",
+ "regex",
  "sqlparser",
  "tempfile",
  "tokio",
@@ -855,9 +862,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-catalog"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow-schema",
  "async-trait",
@@ -937,6 +943,7 @@ dependencies = [
  "datafusion",
  "datafusion-common",
  "datafusion-expr",
+ "datafusion-expr-common",
  "datafusion-physical-expr",
  "futures",
  "num",
@@ -945,57 +952,56 @@ dependencies = [
  "regex",
  "thiserror",
  "tokio",
- "twox-hash 2.0.1",
+ "twox-hash 2.1.0",
 ]
 
 [[package]]
 name = "datafusion-common"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
  "arrow-array",
  "arrow-buffer",
  "arrow-schema",
- "chrono",
  "half",
  "hashbrown 0.14.5",
  "indexmap",
- "instant",
  "libc",
- "num_cpus",
+ "log",
  "object_store",
  "parquet",
  "paste",
  "sqlparser",
  "tokio",
+ "web-time",
 ]
 
 [[package]]
 name = "datafusion-common-runtime"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "log",
  "tokio",
 ]
 
+[[package]]
+name = "datafusion-doc"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
+
 [[package]]
 name = "datafusion-execution"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
- "chrono",
  "dashmap",
  "datafusion-common",
  "datafusion-expr",
  "futures",
- "hashbrown 0.14.5",
  "log",
  "object_store",
  "parking_lot",
@@ -1006,16 +1012,13 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
- "ahash",
  "arrow",
- "arrow-array",
- "arrow-buffer",
  "chrono",
  "datafusion-common",
+ "datafusion-doc",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
  "datafusion-functions-window-common",
@@ -1024,27 +1027,22 @@ dependencies = [
  "paste",
  "serde_json",
  "sqlparser",
- "strum",
- "strum_macros",
 ]
 
 [[package]]
 name = "datafusion-expr-common"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
  "datafusion-common",
  "itertools 0.13.0",
- "paste",
 ]
 
 [[package]]
 name = "datafusion-functions"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
  "arrow-buffer",
@@ -1053,8 +1051,11 @@ dependencies = [
  "blake3",
  "chrono",
  "datafusion-common",
+ "datafusion-doc",
  "datafusion-execution",
  "datafusion-expr",
+ "datafusion-expr-common",
+ "datafusion-macros",
  "hashbrown 0.14.5",
  "hex",
  "itertools 0.13.0",
@@ -1069,44 +1070,41 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-aggregate"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
  "arrow-schema",
  "datafusion-common",
+ "datafusion-doc",
  "datafusion-execution",
  "datafusion-expr",
  "datafusion-functions-aggregate-common",
+ "datafusion-macros",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "half",
- "indexmap",
  "log",
  "paste",
 ]
 
 [[package]]
 name = "datafusion-functions-aggregate-common"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
  "datafusion-common",
  "datafusion-expr-common",
  "datafusion-physical-expr-common",
- "rand",
 ]
 
 [[package]]
 name = "datafusion-functions-nested"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
  "arrow-array",
@@ -1122,18 +1120,33 @@ dependencies = [
  "itertools 0.13.0",
  "log",
  "paste",
- "rand",
+]
+
+[[package]]
+name = "datafusion-functions-table"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
+dependencies = [
+ "arrow",
+ "async-trait",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-physical-plan",
+ "parking_lot",
+ "paste",
 ]
 
 [[package]]
 name = "datafusion-functions-window"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "datafusion-common",
+ "datafusion-doc",
  "datafusion-expr",
  "datafusion-functions-window-common",
+ "datafusion-macros",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "log",
@@ -1142,48 +1155,49 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-window-common"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "datafusion-common",
  "datafusion-physical-expr-common",
 ]
 
+[[package]]
+name = "datafusion-macros"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
+dependencies = [
+ "quote",
+ "syn 2.0.92",
+]
+
 [[package]]
 name = "datafusion-optimizer"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
- "async-trait",
  "chrono",
  "datafusion-common",
  "datafusion-expr",
  "datafusion-physical-expr",
- "hashbrown 0.14.5",
  "indexmap",
  "itertools 0.13.0",
  "log",
- "paste",
+ "regex",
  "regex-syntax",
 ]
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
  "arrow-array",
  "arrow-buffer",
- "arrow-ord",
  "arrow-schema",
- "arrow-string",
- "chrono",
  "datafusion-common",
  "datafusion-expr",
  "datafusion-expr-common",
@@ -1200,39 +1214,36 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-expr-common"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
  "datafusion-common",
  "datafusion-expr-common",
  "hashbrown 0.14.5",
- "rand",
+ "itertools 0.13.0",
 ]
 
 [[package]]
 name = "datafusion-physical-optimizer"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
- "arrow-schema",
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr-common",
  "datafusion-physical-expr",
  "datafusion-physical-plan",
  "itertools 0.13.0",
+ "log",
 ]
 
 [[package]]
 name = "datafusion-physical-plan"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "ahash",
  "arrow",
@@ -1246,7 +1257,6 @@ dependencies = [
  "datafusion-common-runtime",
  "datafusion-execution",
  "datafusion-expr",
- "datafusion-functions-aggregate-common",
  "datafusion-functions-window-common",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
@@ -1256,29 +1266,26 @@ dependencies = [
  "indexmap",
  "itertools 0.13.0",
  "log",
- "once_cell",
  "parking_lot",
  "pin-project-lite",
- "rand",
  "tokio",
 ]
 
 [[package]]
 name = "datafusion-sql"
-version = "43.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51"
+version = "44.0.0"
+source = 
"git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4";
 dependencies = [
  "arrow",
  "arrow-array",
  "arrow-schema",
+ "bigdecimal",
  "datafusion-common",
  "datafusion-expr",
  "indexmap",
  "log",
  "regex",
  "sqlparser",
- "strum",
 ]
 
 [[package]]
@@ -1326,7 +1333,7 @@ checksum = 
"97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -1343,19 +1350,19 @@ checksum = 
"5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
 
 [[package]]
 name = "errno"
-version = "0.3.9"
+version = "0.3.10"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
+checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
 dependencies = [
  "libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
 name = "fastrand"
-version = "2.2.0"
+version = "2.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
+checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
 
 [[package]]
 name = "findshlibs"
@@ -1377,9 +1384,9 @@ checksum = 
"0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
 
 [[package]]
 name = "flatbuffers"
-version = "24.3.25"
+version = "24.12.23"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f"
+checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096"
 dependencies = [
  "bitflags 1.3.2",
  "rustc_version",
@@ -1466,7 +1473,7 @@ checksum = 
"162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -1555,9 +1562,9 @@ dependencies = [
 
 [[package]]
 name = "hashbrown"
-version = "0.15.1"
+version = "0.15.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
+checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
 
 [[package]]
 name = "heck"
@@ -1574,12 +1581,6 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
 
-[[package]]
-name = "hermit-abi"
-version = "0.3.9"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
-
 [[package]]
 name = "hermit-abi"
 version = "0.4.0"
@@ -1594,11 +1595,11 @@ checksum = 
"7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
 [[package]]
 name = "home"
-version = "0.5.9"
+version = "0.5.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
+checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
 dependencies = [
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -1745,7 +1746,7 @@ checksum = 
"1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -1771,12 +1772,12 @@ dependencies = [
 
 [[package]]
 name = "indexmap"
-version = "2.6.0"
+version = "2.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
+checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f"
 dependencies = [
  "equivalent",
- "hashbrown 0.15.1",
+ "hashbrown 0.15.2",
 ]
 
 [[package]]
@@ -1797,18 +1798,6 @@ dependencies = [
  "str_stack",
 ]
 
-[[package]]
-name = "instant"
-version = "0.1.13"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
-dependencies = [
- "cfg-if",
- "js-sys",
- "wasm-bindgen",
- "web-sys",
-]
-
 [[package]]
 name = "integer-encoding"
 version = "3.0.4"
@@ -1821,7 +1810,7 @@ version = "0.4.13"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b"
 dependencies = [
- "hermit-abi 0.4.0",
+ "hermit-abi",
  "libc",
  "windows-sys 0.52.0",
 ]
@@ -1864,9 +1853,9 @@ dependencies = [
 
 [[package]]
 name = "itoa"
-version = "1.0.11"
+version = "1.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
+checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
 
 [[package]]
 name = "java-locator"
@@ -1912,10 +1901,11 @@ dependencies = [
 
 [[package]]
 name = "js-sys"
-version = "0.3.72"
+version = "0.3.76"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9"
+checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7"
 dependencies = [
+ "once_cell",
  "wasm-bindgen",
 ]
 
@@ -1927,9 +1917,9 @@ checksum = 
"bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
 
 [[package]]
 name = "lexical-core"
-version = "1.0.2"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458"
+checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958"
 dependencies = [
  "lexical-parse-float",
  "lexical-parse-integer",
@@ -1940,9 +1930,9 @@ dependencies = [
 
 [[package]]
 name = "lexical-parse-float"
-version = "1.0.2"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0"
+checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2"
 dependencies = [
  "lexical-parse-integer",
  "lexical-util",
@@ -1951,9 +1941,9 @@ dependencies = [
 
 [[package]]
 name = "lexical-parse-integer"
-version = "1.0.2"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61"
+checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e"
 dependencies = [
  "lexical-util",
  "static_assertions",
@@ -1961,18 +1951,18 @@ dependencies = [
 
 [[package]]
 name = "lexical-util"
-version = "1.0.3"
+version = "1.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0"
+checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3"
 dependencies = [
  "static_assertions",
 ]
 
 [[package]]
 name = "lexical-write-float"
-version = "1.0.2"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809"
+checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd"
 dependencies = [
  "lexical-util",
  "lexical-write-integer",
@@ -1981,9 +1971,9 @@ dependencies = [
 
 [[package]]
 name = "lexical-write-integer"
-version = "1.0.2"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162"
+checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978"
 dependencies = [
  "lexical-util",
  "static_assertions",
@@ -1991,9 +1981,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.162"
+version = "0.2.169"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398"
+checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
 
 [[package]]
 name = "libloading"
@@ -2029,9 +2019,9 @@ checksum = 
"78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
 
 [[package]]
 name = "litemap"
-version = "0.7.3"
+version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
+checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
 
 [[package]]
 name = "lock_api"
@@ -2131,9 +2121,9 @@ dependencies = [
 
 [[package]]
 name = "miniz_oxide"
-version = "0.8.0"
+version = "0.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
+checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394"
 dependencies = [
  "adler2",
 ]
@@ -2239,30 +2229,20 @@ dependencies = [
  "libm",
 ]
 
-[[package]]
-name = "num_cpus"
-version = "1.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
-dependencies = [
- "hermit-abi 0.3.9",
- "libc",
-]
-
 [[package]]
 name = "object"
-version = "0.36.5"
+version = "0.36.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
+checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
 dependencies = [
  "memchr",
 ]
 
 [[package]]
 name = "object_store"
-version = "0.11.1"
+version = "0.11.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3"
+checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf"
 dependencies = [
  "async-trait",
  "bytes",
@@ -2325,9 +2305,9 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "53.2.0"
+version = "53.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e"
+checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -2344,7 +2324,7 @@ dependencies = [
  "flate2",
  "futures",
  "half",
- "hashbrown 0.14.5",
+ "hashbrown 0.15.2",
  "lz4_flex",
  "num",
  "num-bigint",
@@ -2506,9 +2486,9 @@ dependencies = [
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.89"
+version = "1.0.92"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
+checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
 dependencies = [
  "unicode-ident",
 ]
@@ -2576,7 +2556,7 @@ dependencies = [
  "itertools 0.12.1",
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -2600,9 +2580,9 @@ dependencies = [
 
 [[package]]
 name = "quote"
-version = "1.0.37"
+version = "1.0.38"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
+checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc"
 dependencies = [
  "proc-macro2",
 ]
@@ -2659,9 +2639,9 @@ dependencies = [
 
 [[package]]
 name = "redox_syscall"
-version = "0.5.7"
+version = "0.5.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
+checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834"
 dependencies = [
  "bitflags 2.6.0",
 ]
@@ -2721,22 +2701,22 @@ dependencies = [
 
 [[package]]
 name = "rustix"
-version = "0.38.40"
+version = "0.38.42"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0"
+checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85"
 dependencies = [
  "bitflags 2.6.0",
  "errno",
  "libc",
  "linux-raw-sys",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
 name = "rustversion"
-version = "1.0.18"
+version = "1.0.19"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248"
+checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4"
 
 [[package]]
 name = "ryu"
@@ -2761,9 +2741,9 @@ checksum = 
"94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
 
 [[package]]
 name = "semver"
-version = "1.0.23"
+version = "1.0.24"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
+checksum = "3cb6eb87a131f756572d7fb904f6e7b68633f09cca868c5df1c4b8d1a694bbba"
 
 [[package]]
 name = "seq-macro"
@@ -2773,9 +2753,9 @@ checksum = 
"a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
 
 [[package]]
 name = "serde"
-version = "1.0.215"
+version = "1.0.217"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f"
+checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70"
 dependencies = [
  "serde_derive",
 ]
@@ -2792,20 +2772,20 @@ dependencies = [
 
 [[package]]
 name = "serde_derive"
-version = "1.0.215"
+version = "1.0.217"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0"
+checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
 name = "serde_json"
-version = "1.0.132"
+version = "1.0.134"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03"
+checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d"
 dependencies = [
  "itoa",
  "memchr",
@@ -2888,7 +2868,7 @@ dependencies = [
  "heck 0.5.0",
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -2899,9 +2879,9 @@ checksum = 
"1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
 [[package]]
 name = "sqlparser"
-version = "0.51.0"
+version = "0.53.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
+checksum = "05a528114c392209b3264855ad491fcce534b94a38771b0a0b97a79379275ce8"
 dependencies = [
  "log",
  "sqlparser_derive",
@@ -2909,13 +2889,13 @@ dependencies = [
 
 [[package]]
 name = "sqlparser_derive"
-version = "0.2.2"
+version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
+checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -2941,9 +2921,6 @@ name = "strum"
 version = "0.26.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
-dependencies = [
- "strum_macros",
-]
 
 [[package]]
 name = "strum_macros"
@@ -2955,7 +2932,7 @@ dependencies = [
  "proc-macro2",
  "quote",
  "rustversion",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -2966,9 +2943,9 @@ checksum = 
"13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
 
 [[package]]
 name = "symbolic-common"
-version = "12.12.1"
+version = "12.12.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3d4d73159efebfb389d819fd479afb2dbd57dcb3e3f4b7fcfa0e675f5a46c1cb"
+checksum = "cd33e73f154e36ec223c18013f7064a2c120f1162fc086ac9933542def186b00"
 dependencies = [
  "debugid",
  "memmap2",
@@ -2978,9 +2955,9 @@ dependencies = [
 
 [[package]]
 name = "symbolic-demangle"
-version = "12.12.1"
+version = "12.12.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a767859f6549c665011970874c3f541838b4835d5aaaa493d3ee383918be9f10"
+checksum = "89e51191290147f071777e37fe111800bb82a9059f9c95b19d2dd41bfeddf477"
 dependencies = [
  "cpp_demangle",
  "rustc-demangle",
@@ -3000,9 +2977,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.87"
+version = "2.0.92"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
+checksum = "70ae51629bf965c5c098cc9e87908a3df5301051a9e087d6f9bef5c9771ed126"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3017,7 +2994,7 @@ checksum = 
"c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -3050,7 +3027,7 @@ checksum = 
"4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
@@ -3105,9 +3082,9 @@ dependencies = [
 
 [[package]]
 name = "tokio"
-version = "1.41.1"
+version = "1.42.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33"
+checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551"
 dependencies = [
  "backtrace",
  "bytes",
@@ -3123,14 +3100,14 @@ checksum = 
"693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
 name = "tracing"
-version = "0.1.40"
+version = "0.1.41"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
+checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
 dependencies = [
  "pin-project-lite",
  "tracing-attributes",
@@ -3139,20 +3116,20 @@ dependencies = [
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.27"
+version = "0.1.28"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
+checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
 name = "tracing-core"
-version = "0.1.32"
+version = "0.1.33"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
+checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
 dependencies = [
  "once_cell",
 ]
@@ -3169,9 +3146,9 @@ dependencies = [
 
 [[package]]
 name = "twox-hash"
-version = "2.0.1"
+version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a6db6856664807f43c17fbaf2718e2381ac1476a449aa104f5f64622defa1245"
+checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908"
 dependencies = [
  "rand",
 ]
@@ -3193,9 +3170,9 @@ checksum = 
"42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
 
 [[package]]
 name = "unicode-ident"
-version = "1.0.13"
+version = "1.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
+checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
 
 [[package]]
 name = "unicode-segmentation"
@@ -3226,9 +3203,9 @@ checksum = 
"673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
 
 [[package]]
 name = "url"
-version = "2.5.3"
+version = "2.5.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada"
+checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60"
 dependencies = [
  "form_urlencoded",
  "idna",
@@ -3280,9 +3257,9 @@ checksum = 
"9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
 
 [[package]]
 name = "wasm-bindgen"
-version = "0.2.95"
+version = "0.2.99"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
+checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396"
 dependencies = [
  "cfg-if",
  "once_cell",
@@ -3291,24 +3268,23 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-backend"
-version = "0.2.95"
+version = "0.2.99"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
+checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79"
 dependencies = [
  "bumpalo",
  "log",
- "once_cell",
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
  "wasm-bindgen-shared",
 ]
 
 [[package]]
 name = "wasm-bindgen-macro"
-version = "0.2.95"
+version = "0.2.99"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
+checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe"
 dependencies = [
  "quote",
  "wasm-bindgen-macro-support",
@@ -3316,28 +3292,38 @@ dependencies = [
 
 [[package]]
 name = "wasm-bindgen-macro-support"
-version = "0.2.95"
+version = "0.2.99"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
+checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
  "wasm-bindgen-backend",
  "wasm-bindgen-shared",
 ]
 
 [[package]]
 name = "wasm-bindgen-shared"
-version = "0.2.95"
+version = "0.2.99"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
+checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
 
 [[package]]
 name = "web-sys"
-version = "0.3.72"
+version = "0.3.76"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112"
+checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "web-time"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
 dependencies = [
  "js-sys",
  "wasm-bindgen",
@@ -3557,9 +3543,9 @@ checksum = 
"1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
 
 [[package]]
 name = "yoke"
-version = "0.7.4"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
+checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40"
 dependencies = [
  "serde",
  "stable_deref_trait",
@@ -3569,13 +3555,13 @@ dependencies = [
 
 [[package]]
 name = "yoke-derive"
-version = "0.7.4"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
+checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
  "synstructure",
 ]
 
@@ -3597,27 +3583,27 @@ checksum = 
"fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
 name = "zerofrom"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
+checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e"
 dependencies = [
  "zerofrom-derive",
 ]
 
 [[package]]
 name = "zerofrom-derive"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
+checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
  "synstructure",
 ]
 
@@ -3640,7 +3626,7 @@ checksum = 
"6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.87",
+ "syn 2.0.92",
 ]
 
 [[package]]
diff --git a/native/Cargo.toml b/native/Cargo.toml
index bd46cf0c..cf4921eb 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -33,20 +33,21 @@ edition = "2021"
 rust-version = "1.79"
 
 [workspace.dependencies]
-arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
-arrow-array = { version = "53.2.0" }
-arrow-buffer = { version = "53.2.0" }
-arrow-data = { version = "53.2.0" }
-arrow-schema = { version = "53.2.0" }
-parquet = { version = "53.2.0", default-features = false, features = 
["experimental"] }
-datafusion = { version = "43.0.0", default-features = false, features = 
["unicode_expressions", "crypto_expressions"] }
-datafusion-common = { version = "43.0.0" }
-datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] 
}
-datafusion-functions-nested = { version = "43.0.0", default-features = false }
-datafusion-expr = { version = "43.0.0", default-features = false }
-datafusion-execution = { version = "43.0.0", default-features = false }
-datafusion-physical-plan = { version = "43.0.0", default-features = false }
-datafusion-physical-expr = { version = "43.0.0", default-features = false }
+arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
+arrow-array = { version = "53.3.0" }
+arrow-buffer = { version = "53.3.0" }
+arrow-data = { version = "53.3.0" }
+arrow-schema = { version = "53.3.0" }
+parquet = { version = "53.3.0", default-features = false, features = 
["experimental"] }
+datafusion = { git = "https://github.com/apache/datafusion.git";, rev = 
"44.0.0-rc2", default-features = false, features = ["unicode_expressions", 
"crypto_expressions"] }
+datafusion-common = { git = "https://github.com/apache/datafusion.git";, rev = 
"44.0.0-rc2", default-features = false }
+datafusion-functions = { git = "https://github.com/apache/datafusion.git";, rev 
= "44.0.0-rc2", default-features = false, features = ["crypto_expressions"] }
+datafusion-functions-nested = { git = 
"https://github.com/apache/datafusion.git";, rev = "44.0.0-rc2", 
default-features = false }
+datafusion-expr = { git = "https://github.com/apache/datafusion.git";, rev = 
"44.0.0-rc2", default-features = false }
+datafusion-expr-common = { git = "https://github.com/apache/datafusion.git";, 
rev = "44.0.0-rc2", default-features = false }
+datafusion-execution = { git = "https://github.com/apache/datafusion.git";, rev 
= "44.0.0-rc2", default-features = false }
+datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git";, 
rev = "44.0.0-rc2", default-features = false }
+datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git";, 
rev = "44.0.0-rc2", default-features = false }
 datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
 datafusion-comet-proto = { path = "proto", version = "0.5.0" }
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
diff --git 
a/native/core/src/execution/expressions/bloom_filter_might_contain.rs 
b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
index af6a5a47..b14fab62 100644
--- a/native/core/src/execution/expressions/bloom_filter_might_contain.rs
+++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
@@ -19,26 +19,37 @@ use 
crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data
 use arrow::record_batch::RecordBatch;
 use arrow_array::cast::as_primitive_array;
 use arrow_schema::{DataType, Schema};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion::physical_plan::ColumnarValue;
 use datafusion_common::{internal_err, Result, ScalarValue};
 use datafusion_physical_expr::PhysicalExpr;
-use std::{
-    any::Any,
-    fmt::Display,
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
+use std::hash::Hash;
+use std::{any::Any, fmt::Display, sync::Arc};
 
 /// A physical expression that checks if a value might be in a bloom filter. 
It corresponds to the
 /// Spark's `BloomFilterMightContain` expression.
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct BloomFilterMightContain {
     pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
     pub value_expr: Arc<dyn PhysicalExpr>,
     bloom_filter: Option<SparkBloomFilter>,
 }
 
+impl Hash for BloomFilterMightContain {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.bloom_filter_expr.hash(state);
+        self.value_expr.hash(state);
+        self.bloom_filter.hash(state);
+    }
+}
+
+impl PartialEq for BloomFilterMightContain {
+    fn eq(&self, other: &Self) -> bool {
+        self.bloom_filter_expr.eq(&other.bloom_filter_expr)
+            && self.value_expr.eq(&other.value_expr)
+            && self.bloom_filter.eq(&other.bloom_filter)
+    }
+}
+
 impl Display for BloomFilterMightContain {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         write!(
@@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain {
     }
 }
 
-impl PartialEq<dyn Any> for BloomFilterMightContain {
-    fn eq(&self, _other: &dyn Any) -> bool {
-        down_cast_any_ref(_other)
-            .downcast_ref::<Self>()
-            .map(|other| {
-                self.bloom_filter_expr.eq(&other.bloom_filter_expr)
-                    && self.value_expr.eq(&other.value_expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
 fn evaluate_bloom_filter(
     bloom_filter_expr: &Arc<dyn PhysicalExpr>,
 ) -> Result<Option<SparkBloomFilter>> {
@@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain {
             Arc::clone(&children[1]),
         )?))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.bloom_filter_expr.hash(&mut s);
-        self.value_expr.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
diff --git a/native/core/src/execution/expressions/subquery.rs 
b/native/core/src/execution/expressions/subquery.rs
index 3eeb29c1..d933a609 100644
--- a/native/core/src/execution/expressions/subquery.rs
+++ b/native/core/src/execution/expressions/subquery.rs
@@ -22,7 +22,6 @@ use crate::{
 use arrow_array::RecordBatch;
 use arrow_schema::{DataType, Schema, TimeUnit};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{internal_err, ScalarValue};
 use datafusion_physical_expr::PhysicalExpr;
 use jni::{
@@ -32,11 +31,11 @@ use jni::{
 use std::{
     any::Any,
     fmt::{Display, Formatter},
-    hash::{Hash, Hasher},
+    hash::Hash,
     sync::Arc,
 };
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Hash, PartialEq, Eq)]
 pub struct Subquery {
     /// The ID of the execution context that owns this subquery. We use this 
ID to retrieve the
     /// subquery result.
@@ -63,19 +62,6 @@ impl Display for Subquery {
     }
 }
 
-impl PartialEq<dyn Any> for Subquery {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.id.eq(&x.id)
-                    && self.data_type.eq(&x.data_type)
-                    && self.exec_context_id.eq(&x.exec_context_id)
-            })
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for Subquery {
     fn as_any(&self) -> &dyn Any {
         self
@@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery {
     ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
         Ok(self)
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.hash(&mut s)
-    }
 }
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index 2c1a55f4..09caf5e2 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -20,10 +20,7 @@
 use arrow::datatypes::DataType as ArrowDataType;
 use arrow_array::RecordBatch;
 use datafusion::{
-    execution::{
-        disk_manager::DiskManagerConfig,
-        runtime_env::{RuntimeConfig, RuntimeEnv},
-    },
+    execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
     physical_plan::{display::DisplayableExecutionPlan, 
SendableRecordBatchStream},
     prelude::{SessionConfig, SessionContext},
 };
@@ -52,6 +49,7 @@ use crate::{
 };
 use datafusion_comet_proto::spark_operator::Operator;
 use datafusion_common::ScalarValue;
+use datafusion_execution::runtime_env::RuntimeEnvBuilder;
 use futures::stream::StreamExt;
 use jni::{
     objects::GlobalRef,
@@ -188,7 +186,7 @@ fn prepare_datafusion_session_context(
     memory_fraction: f64,
     comet_task_memory_manager: Arc<GlobalRef>,
 ) -> CometResult<SessionContext> {
-    let mut rt_config = 
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
+    let mut rt_config = 
RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
 
     // Check if we are using unified memory manager integrated with Spark.
     if use_unified_memory_manager {
@@ -216,6 +214,7 @@ fn prepare_datafusion_session_context(
             &ScalarValue::Float64(Some(1.1)),
         );
 
+    #[allow(deprecated)]
     let runtime = RuntimeEnv::try_new(rt_config)?;
 
     let mut session_ctx = SessionContext::new_with_config_rt(session_config, 
Arc::new(runtime));
diff --git a/native/core/src/execution/operators/copy.rs 
b/native/core/src/execution/operators/copy.rs
index 8eeda8a5..cec00eb2 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -30,6 +30,7 @@ use arrow_array::{
 use arrow_data::transform::MutableArrayData;
 use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};
 
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::metrics::{BaselineMetrics, 
ExecutionPlanMetricsSet, MetricsSet};
 use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
 use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as 
DataFusionResult};
@@ -78,7 +79,8 @@ impl CopyExec {
         let cache = PlanProperties::new(
             EquivalenceProperties::new(Arc::clone(&schema)),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Final,
+            Boundedness::Bounded,
         );
 
         Self {
diff --git a/native/core/src/execution/operators/expand.rs 
b/native/core/src/execution/operators/expand.rs
index fb43a6e4..f75822d4 100644
--- a/native/core/src/execution/operators/expand.rs
+++ b/native/core/src/execution/operators/expand.rs
@@ -17,10 +17,11 @@
 
 use arrow_array::{RecordBatch, RecordBatchOptions};
 use arrow_schema::SchemaRef;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::{
     execution::TaskContext,
     physical_plan::{
-        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
Partitioning, PlanProperties,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
         RecordBatchStream, SendableRecordBatchStream,
     },
 };
@@ -54,7 +55,8 @@ impl ExpandExec {
         let cache = PlanProperties::new(
             EquivalenceProperties::new(Arc::clone(&schema)),
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Final,
+            Boundedness::Bounded,
         );
 
         Self {
diff --git a/native/core/src/execution/operators/filter.rs 
b/native/core/src/execution/operators/filter.rs
index d9a54712..eab30a35 100644
--- a/native/core/src/execution/operators/filter.rs
+++ b/native/core/src/execution/operators/filter.rs
@@ -210,7 +210,8 @@ impl FilterExec {
         Ok(PlanProperties::new(
             eq_properties,
             input.output_partitioning().clone(), // Output Partitioning
-            input.execution_mode(),              // Execution Mode
+            input.pipeline_behavior(),
+            input.boundedness(),
         ))
     }
 }
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index a297f87c..888cd2fd 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray;
 use arrow_data::ArrayData;
 use arrow_schema::ffi::FFI_ArrowSchema;
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
 };
@@ -122,7 +123,8 @@ impl ScanExec {
             // The partitioning is not important because we are not using 
DataFusion's
             // query planner or optimizer
             Partitioning::UnknownPartitioning(1),
-            ExecutionMode::Bounded,
+            EmissionType::Final,
+            Boundedness::Bounded,
         );
 
         Ok(Self {
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 0a749335..5a35c62e 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -94,7 +94,6 @@ use datafusion_common::{
     tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter},
     JoinType as DFJoinType, ScalarValue,
 };
-use datafusion_expr::expr::find_df_window_func;
 use datafusion_expr::{
     AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
     WindowFunctionDefinition,
@@ -1515,10 +1514,7 @@ impl PhysicalPlanner {
 
                 let builder = match datatype {
                     DataType::Decimal128(_, _) => {
-                        let func = 
AggregateUDF::new_from_impl(SumDecimal::try_new(
-                            Arc::clone(&child),
-                            datatype,
-                        )?);
+                        let func = 
AggregateUDF::new_from_impl(SumDecimal::try_new(datatype)?);
                         AggregateExprBuilder::new(Arc::new(func), vec![child])
                     }
                     _ => {
@@ -1543,11 +1539,8 @@ impl PhysicalPlanner {
                 let input_datatype = 
to_arrow_datatype(expr.sum_datatype.as_ref().unwrap());
                 let builder = match datatype {
                     DataType::Decimal128(_, _) => {
-                        let func = AggregateUDF::new_from_impl(AvgDecimal::new(
-                            Arc::clone(&child),
-                            datatype,
-                            input_datatype,
-                        ));
+                        let func =
+                            
AggregateUDF::new_from_impl(AvgDecimal::new(datatype, input_datatype));
                         AggregateExprBuilder::new(Arc::new(func), vec![child])
                     }
                     _ => {
@@ -1556,11 +1549,7 @@ impl PhysicalPlanner {
                         // failure since it should have already been checked 
at Spark side.
                         let child: Arc<dyn PhysicalExpr> =
                             Arc::new(CastExpr::new(Arc::clone(&child), 
datatype.clone(), None));
-                        let func = AggregateUDF::new_from_impl(Avg::new(
-                            Arc::clone(&child),
-                            "avg",
-                            datatype,
-                        ));
+                        let func = AggregateUDF::new_from_impl(Avg::new("avg", 
datatype));
                         AggregateExprBuilder::new(Arc::new(func), vec![child])
                     }
                 };
@@ -1638,8 +1627,6 @@ impl PhysicalPlanner {
                 match expr.stats_type {
                     0 => {
                         let func = AggregateUDF::new_from_impl(Covariance::new(
-                            Arc::clone(&child1),
-                            Arc::clone(&child2),
                             "covariance",
                             datatype,
                             StatsType::Sample,
@@ -1655,8 +1642,6 @@ impl PhysicalPlanner {
                     }
                     1 => {
                         let func = AggregateUDF::new_from_impl(Covariance::new(
-                            Arc::clone(&child1),
-                            Arc::clone(&child2),
                             "covariance_pop",
                             datatype,
                             StatsType::Population,
@@ -1682,7 +1667,6 @@ impl PhysicalPlanner {
                 match expr.stats_type {
                     0 => {
                         let func = AggregateUDF::new_from_impl(Variance::new(
-                            Arc::clone(&child),
                             "variance",
                             datatype,
                             StatsType::Sample,
@@ -1693,7 +1677,6 @@ impl PhysicalPlanner {
                     }
                     1 => {
                         let func = AggregateUDF::new_from_impl(Variance::new(
-                            Arc::clone(&child),
                             "variance_pop",
                             datatype,
                             StatsType::Population,
@@ -1714,7 +1697,6 @@ impl PhysicalPlanner {
                 match expr.stats_type {
                     0 => {
                         let func = AggregateUDF::new_from_impl(Stddev::new(
-                            Arc::clone(&child),
                             "stddev",
                             datatype,
                             StatsType::Sample,
@@ -1725,7 +1707,6 @@ impl PhysicalPlanner {
                     }
                     1 => {
                         let func = AggregateUDF::new_from_impl(Stddev::new(
-                            Arc::clone(&child),
                             "stddev_pop",
                             datatype,
                             StatsType::Population,
@@ -1747,8 +1728,6 @@ impl PhysicalPlanner {
                     self.create_expr(expr.child2.as_ref().unwrap(), 
Arc::clone(&schema))?;
                 let datatype = 
to_arrow_datatype(expr.datatype.as_ref().unwrap());
                 let func = AggregateUDF::new_from_impl(Correlation::new(
-                    Arc::clone(&child1),
-                    Arc::clone(&child2),
                     "correlation",
                     datatype,
                     expr.null_on_divide_by_zero,
@@ -1935,7 +1914,7 @@ impl PhysicalPlanner {
             window_func_name,
             &window_args,
             partition_by,
-            sort_exprs,
+            &LexOrdering::new(sort_exprs.to_vec()),
             window_frame.into(),
             input_schema.as_ref(),
             false, // TODO: Ignore nulls
@@ -1985,15 +1964,11 @@ impl PhysicalPlanner {
 
     /// Find DataFusion's built-in window function by name.
     fn find_df_window_function(&self, name: &str) -> 
Option<WindowFunctionDefinition> {
-        if let Some(f) = find_df_window_func(name) {
-            Some(f)
-        } else {
-            let registry = &self.session_ctx.state();
-            registry
-                .udaf(name)
-                .map(WindowFunctionDefinition::AggregateUDF)
-                .ok()
-        }
+        let registry = &self.session_ctx.state();
+        registry
+            .udaf(name)
+            .map(WindowFunctionDefinition::AggregateUDF)
+            .ok()
     }
 
     /// Create a DataFusion physical partitioning from Spark physical 
partitioning
@@ -2049,7 +2024,15 @@ impl PhysicalPlanner {
                         .coerce_types(&input_expr_types)
                         .unwrap_or_else(|_| input_expr_types.clone());
 
-                    let data_type = func.inner().return_type(&coerced_types)?;
+                    let data_type = match fun_name {
+                        // workaround for 
https://github.com/apache/datafusion/issues/13716
+                        "datepart" => DataType::Int32,
+                        _ => {
+                            // TODO need to call `return_type_from_exprs` 
instead
+                            #[allow(deprecated)]
+                            func.inner().return_type(&coerced_types)?
+                        }
+                    };
 
                     (data_type, coerced_types)
                 }
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 01117199..f3fa685b 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -25,6 +25,7 @@ use arrow::{datatypes::*, ipc::writer::StreamWriter};
 use async_trait::async_trait;
 use bytes::Buf;
 use crc32fast::Hasher;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::{
     arrow::{
         array::*,
@@ -44,7 +45,7 @@ use datafusion::{
             BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet, Time,
         },
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, 
Partitioning, PlanProperties,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
         RecordBatchStream, SendableRecordBatchStream, Statistics,
     },
 };
@@ -191,7 +192,8 @@ impl ShuffleWriterExec {
         let cache = PlanProperties::new(
             EquivalenceProperties::new(Arc::clone(&input.schema())),
             partitioning.clone(),
-            ExecutionMode::Bounded,
+            EmissionType::Final,
+            Boundedness::Bounded,
         );
 
         Ok(ShuffleWriterExec {
diff --git a/native/core/src/execution/util/spark_bit_array.rs 
b/native/core/src/execution/util/spark_bit_array.rs
index 6cfecc1b..3ac8b199 100644
--- a/native/core/src/execution/util/spark_bit_array.rs
+++ b/native/core/src/execution/util/spark_bit_array.rs
@@ -22,7 +22,7 @@ use std::iter::zip;
 /// A simple bit array implementation that simulates the behavior of Spark's 
BitArray which is
 /// used in the BloomFilter implementation. Some methods are not implemented 
as they are not
 /// required for the current use case.
-#[derive(Debug, Hash)]
+#[derive(Debug, Hash, PartialEq, Eq)]
 pub struct SparkBitArray {
     data: Vec<u64>,
     bit_count: usize,
diff --git a/native/core/src/execution/util/spark_bloom_filter.rs 
b/native/core/src/execution/util/spark_bloom_filter.rs
index 2c3af169..61245757 100644
--- a/native/core/src/execution/util/spark_bloom_filter.rs
+++ b/native/core/src/execution/util/spark_bloom_filter.rs
@@ -27,7 +27,7 @@ const SPARK_BLOOM_FILTER_VERSION_1: i32 = 1;
 /// A Bloom filter implementation that simulates the behavior of Spark's 
BloomFilter.
 /// It's not a complete implementation of Spark's BloomFilter, but just add 
the minimum
 /// methods to support mightContainsLong in the native side.
-#[derive(Debug, Hash)]
+#[derive(Debug, Hash, PartialEq, Eq)]
 pub struct SparkBloomFilter {
     bits: SparkBitArray,
     num_hash_functions: u32,
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index 27367d83..fc348f81 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -36,6 +36,7 @@ chrono = { workspace = true }
 datafusion = { workspace = true, features = ["parquet"] }
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-expr-common = { workspace = true }
 datafusion-physical-expr = { workspace = true }
 chrono-tz = { workspace = true }
 num = { workspace = true }
diff --git a/native/spark-expr/benches/aggregate.rs 
b/native/spark-expr/benches/aggregate.rs
index 43194fdd..051ac5eb 100644
--- a/native/spark-expr/benches/aggregate.rs
+++ b/native/spark-expr/benches/aggregate.rs
@@ -66,7 +66,6 @@ fn criterion_benchmark(c: &mut Criterion) {
 
     group.bench_function("avg_decimal_comet", |b| {
         let comet_avg_decimal = 
Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new(
-            Arc::clone(&c1),
             DataType::Decimal128(38, 10),
             DataType::Decimal128(38, 10),
         )));
@@ -96,7 +95,7 @@ fn criterion_benchmark(c: &mut Criterion) {
 
     group.bench_function("sum_decimal_comet", |b| {
         let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(
-            SumDecimal::try_new(Arc::clone(&c1), DataType::Decimal128(38, 
10)).unwrap(),
+            SumDecimal::try_new(DataType::Decimal128(38, 10)).unwrap(),
         ));
         b.to_async(&rt).iter(|| {
             black_box(agg_test(
diff --git a/native/spark-expr/src/avg.rs b/native/spark-expr/src/avg.rs
index 7820497d..816440ac 100644
--- a/native/spark-expr/src/avg.rs
+++ b/native/spark-expr/src/avg.rs
@@ -27,11 +27,10 @@ use datafusion::logical_expr::{
     type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, 
GroupsAccumulator, Signature,
 };
 use datafusion_common::{not_impl_err, Result, ScalarValue};
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
+use datafusion_physical_expr::expressions::format_state_name;
 use std::{any::Any, sync::Arc};
 
 use arrow_array::ArrowNativeTypeOp;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::Volatility::Immutable;
 use datafusion_expr::{AggregateUDFImpl, ReversedUDAF};
@@ -42,20 +41,19 @@ use DataType::*;
 pub struct Avg {
     name: String,
     signature: Signature,
-    expr: Arc<dyn PhysicalExpr>,
+    // expr: Arc<dyn PhysicalExpr>,
     input_data_type: DataType,
     result_data_type: DataType,
 }
 
 impl Avg {
     /// Create a new AVG aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>, name: impl Into<String>, 
data_type: DataType) -> Self {
+    pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
         let result_data_type = avg_return_type("avg", &data_type).unwrap();
 
         Self {
             name: name.into(),
             signature: Signature::user_defined(Immutable),
-            expr,
             input_data_type: data_type,
             result_data_type,
         }
@@ -139,20 +137,6 @@ impl AggregateUDFImpl for Avg {
     }
 }
 
-impl PartialEq<dyn Any> for Avg {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.input_data_type == x.input_data_type
-                    && self.result_data_type == x.result_data_type
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute the average
 #[derive(Debug, Default)]
 pub struct AvgAccumulator {
diff --git a/native/spark-expr/src/avg_decimal.rs 
b/native/spark-expr/src/avg_decimal.rs
index 163e1560..05fc28e5 100644
--- a/native/spark-expr/src/avg_decimal.rs
+++ b/native/spark-expr/src/avg_decimal.rs
@@ -25,14 +25,13 @@ use arrow_array::{
 use arrow_schema::{DataType, Field};
 use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator, 
Signature};
 use datafusion_common::{not_impl_err, Result, ScalarValue};
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
+use datafusion_physical_expr::expressions::format_state_name;
 use std::{any::Any, sync::Arc};
 
 use crate::utils::is_valid_decimal_precision;
 use arrow_array::ArrowNativeTypeOp;
 use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, 
MIN_DECIMAL_FOR_EACH_PRECISION};
 use datafusion::logical_expr::Volatility::Immutable;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::{AggregateUDFImpl, ReversedUDAF};
@@ -43,17 +42,15 @@ use DataType::*;
 #[derive(Debug, Clone)]
 pub struct AvgDecimal {
     signature: Signature,
-    expr: Arc<dyn PhysicalExpr>,
     sum_data_type: DataType,
     result_data_type: DataType,
 }
 
 impl AvgDecimal {
     /// Create a new AVG aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>, result_type: DataType, sum_type: 
DataType) -> Self {
+    pub fn new(result_type: DataType, sum_type: DataType) -> Self {
         Self {
             signature: Signature::user_defined(Immutable),
-            expr,
             result_data_type: result_type,
             sum_data_type: sum_type,
         }
@@ -156,19 +153,6 @@ impl AggregateUDFImpl for AvgDecimal {
     }
 }
 
-impl PartialEq<dyn Any> for AvgDecimal {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.sum_data_type == x.sum_data_type
-                    && self.result_data_type == x.result_data_type
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute the average for decimals
 #[derive(Debug)]
 struct AvgDecimalAccumulator {
diff --git a/native/spark-expr/src/bitwise_not.rs 
b/native/spark-expr/src/bitwise_not.rs
index 36234935..d7c31836 100644
--- a/native/spark-expr/src/bitwise_not.rs
+++ b/native/spark-expr/src/bitwise_not.rs
@@ -15,21 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
-
 use arrow::{
     array::*,
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
 };
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
 use datafusion_common::Result;
 use datafusion_physical_expr::PhysicalExpr;
+use std::hash::Hash;
+use std::{any::Any, sync::Arc};
 
 macro_rules! compute_op {
     ($OPERAND:expr, $DT:ident) => {{
@@ -43,12 +38,24 @@ macro_rules! compute_op {
 }
 
 /// BitwiseNot expression
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct BitwiseNotExpr {
     /// Input expression
     arg: Arc<dyn PhysicalExpr>,
 }
 
+impl Hash for BitwiseNotExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.arg.hash(state);
+    }
+}
+
+impl PartialEq for BitwiseNotExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.arg.eq(&other.arg)
+    }
+}
+
 impl BitwiseNotExpr {
     /// Create new bitwise not expression
     pub fn new(arg: Arc<dyn PhysicalExpr>) -> Self {
@@ -114,21 +121,6 @@ impl PhysicalExpr for BitwiseNotExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0]))))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.arg.hash(&mut s);
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for BitwiseNotExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.arg.eq(&x.arg))
-            .unwrap_or(false)
-    }
 }
 
 pub fn bitwise_not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index d96bcbbd..6e0e0915 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -39,7 +39,6 @@ use arrow_array::builder::StringBuilder;
 use arrow_array::{DictionaryArray, StringArray, StructArray};
 use arrow_schema::{DataType, Field, Schema};
 use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{
     cast::as_generic_string_array, internal_err, Result as DataFusionResult, 
ScalarValue,
 };
@@ -54,7 +53,7 @@ use std::str::FromStr;
 use std::{
     any::Any,
     fmt::{Debug, Display, Formatter},
-    hash::{Hash, Hasher},
+    hash::Hash,
     num::Wrapping,
     sync::Arc,
 };
@@ -131,13 +130,29 @@ impl TimeStampInfo {
     }
 }
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct Cast {
     pub child: Arc<dyn PhysicalExpr>,
     pub data_type: DataType,
     pub cast_options: SparkCastOptions,
 }
 
+impl PartialEq for Cast {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child)
+            && self.data_type.eq(&other.data_type)
+            && self.cast_options.eq(&other.cast_options)
+    }
+}
+
+impl Hash for Cast {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.data_type.hash(state);
+        self.cast_options.hash(state);
+    }
+}
+
 /// Determine if Comet supports a cast, taking options such as EvalMode and 
Timezone into account.
 pub fn cast_supported(
     from_type: &DataType,
@@ -1681,19 +1696,6 @@ impl Display for Cast {
     }
 }
 
-impl PartialEq<dyn Any> for Cast {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.child.eq(&x.child)
-                    && self.cast_options.eq(&x.cast_options)
-                    && self.data_type.eq(&x.data_type)
-            })
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for Cast {
     fn as_any(&self) -> &dyn Any {
         self
@@ -1729,14 +1731,6 @@ impl PhysicalExpr for Cast {
             _ => internal_err!("Cast should have exactly one child"),
         }
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.data_type.hash(&mut s);
-        self.cast_options.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 fn timestamp_parser<T: TimeZone>(
diff --git a/native/spark-expr/src/checkoverflow.rs 
b/native/spark-expr/src/checkoverflow.rs
index e922171b..528bbd5d 100644
--- a/native/spark-expr/src/checkoverflow.rs
+++ b/native/spark-expr/src/checkoverflow.rs
@@ -15,13 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    fmt::{Display, Formatter},
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
-
 use arrow::{
     array::{as_primitive_array, Array, ArrayRef, Decimal128Array},
     datatypes::{Decimal128Type, DecimalType},
@@ -29,21 +22,42 @@ use arrow::{
 };
 use arrow_schema::{DataType, Schema};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{DataFusionError, ScalarValue};
 use datafusion_physical_expr::PhysicalExpr;
+use std::hash::Hash;
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+    sync::Arc,
+};
 
 /// This is from Spark `CheckOverflow` expression. Spark `CheckOverflow` 
expression rounds decimals
 /// to given scale and check if the decimals can fit in given precision. As 
`cast` kernel rounds
 /// decimals already, Comet `CheckOverflow` expression only checks if the 
decimals can fit in the
 /// precision.
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct CheckOverflow {
     pub child: Arc<dyn PhysicalExpr>,
     pub data_type: DataType,
     pub fail_on_error: bool,
 }
 
+impl Hash for CheckOverflow {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.data_type.hash(state);
+        self.fail_on_error.hash(state);
+    }
+}
+
+impl PartialEq for CheckOverflow {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child)
+            && self.data_type.eq(&other.data_type)
+            && self.fail_on_error.eq(&other.fail_on_error)
+    }
+}
+
 impl CheckOverflow {
     pub fn new(child: Arc<dyn PhysicalExpr>, data_type: DataType, 
fail_on_error: bool) -> Self {
         Self {
@@ -64,19 +78,6 @@ impl Display for CheckOverflow {
     }
 }
 
-impl PartialEq<dyn Any> for CheckOverflow {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.child.eq(&x.child)
-                    && self.data_type.eq(&x.data_type)
-                    && self.fail_on_error.eq(&x.fail_on_error)
-            })
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for CheckOverflow {
     fn as_any(&self) -> &dyn Any {
         self
@@ -162,12 +163,4 @@ impl PhysicalExpr for CheckOverflow {
             self.fail_on_error,
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.data_type.hash(&mut s);
-        self.fail_on_error.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
diff --git a/native/spark-expr/src/correlation.rs 
b/native/spark-expr/src/correlation.rs
index e5f36c6f..e4ddab95 100644
--- a/native/spark-expr/src/correlation.rs
+++ b/native/spark-expr/src/correlation.rs
@@ -26,13 +26,12 @@ use arrow::{
     datatypes::{DataType, Field},
 };
 use datafusion::logical_expr::Accumulator;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::type_coercion::aggregates::NUMERICS;
 use datafusion_expr::{AggregateUDFImpl, Signature, Volatility};
+use datafusion_physical_expr::expressions::format_state_name;
 use datafusion_physical_expr::expressions::StatsType;
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 
 /// CORR aggregate expression
 /// The implementation mostly is the same as the DataFusion's implementation. 
The reason
@@ -43,26 +42,16 @@ use 
datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 pub struct Correlation {
     name: String,
     signature: Signature,
-    expr1: Arc<dyn PhysicalExpr>,
-    expr2: Arc<dyn PhysicalExpr>,
     null_on_divide_by_zero: bool,
 }
 
 impl Correlation {
-    pub fn new(
-        expr1: Arc<dyn PhysicalExpr>,
-        expr2: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-        null_on_divide_by_zero: bool,
-    ) -> Self {
+    pub fn new(name: impl Into<String>, data_type: DataType, 
null_on_divide_by_zero: bool) -> Self {
         // the result of correlation just support FLOAT64 data type.
         assert!(matches!(data_type, DataType::Float64));
         Self {
             name: name.into(),
             signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),
-            expr1,
-            expr2,
             null_on_divide_by_zero,
         }
     }
@@ -131,20 +120,6 @@ impl AggregateUDFImpl for Correlation {
     }
 }
 
-impl PartialEq<dyn Any> for Correlation {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.expr1.eq(&x.expr1)
-                    && self.expr2.eq(&x.expr2)
-                    && self.null_on_divide_by_zero == x.null_on_divide_by_zero
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute correlation
 #[derive(Debug)]
 pub struct CorrelationAccumulator {
diff --git a/native/spark-expr/src/covariance.rs 
b/native/spark-expr/src/covariance.rs
index 9166e397..fa3563cd 100644
--- a/native/spark-expr/src/covariance.rs
+++ b/native/spark-expr/src/covariance.rs
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use std::{any::Any, sync::Arc};
+use std::any::Any;
 
 use arrow::{
     array::{ArrayRef, Float64Array},
@@ -25,15 +25,14 @@ use arrow::{
     datatypes::{DataType, Field},
 };
 use datafusion::logical_expr::Accumulator;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{
     downcast_value, unwrap_or_internal_err, DataFusionError, Result, 
ScalarValue,
 };
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::type_coercion::aggregates::NUMERICS;
 use datafusion_expr::{AggregateUDFImpl, Signature, Volatility};
+use datafusion_physical_expr::expressions::format_state_name;
 use datafusion_physical_expr::expressions::StatsType;
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 
 /// COVAR_SAMP and COVAR_POP aggregate expression
 /// The implementation mostly is the same as the DataFusion's implementation. 
The reason
@@ -43,8 +42,6 @@ use 
datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 pub struct Covariance {
     name: String,
     signature: Signature,
-    expr1: Arc<dyn PhysicalExpr>,
-    expr2: Arc<dyn PhysicalExpr>,
     stats_type: StatsType,
     null_on_divide_by_zero: bool,
 }
@@ -52,8 +49,6 @@ pub struct Covariance {
 impl Covariance {
     /// Create a new COVAR aggregate function
     pub fn new(
-        expr1: Arc<dyn PhysicalExpr>,
-        expr2: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         data_type: DataType,
         stats_type: StatsType,
@@ -64,8 +59,6 @@ impl Covariance {
         Self {
             name: name.into(),
             signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),
-            expr1,
-            expr2,
             stats_type,
             null_on_divide_by_zero,
         }
@@ -126,21 +119,6 @@ impl AggregateUDFImpl for Covariance {
     }
 }
 
-impl PartialEq<dyn Any> for Covariance {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.expr1.eq(&x.expr1)
-                    && self.expr2.eq(&x.expr2)
-                    && self.stats_type == x.stats_type
-                    && self.null_on_divide_by_zero == x.null_on_divide_by_zero
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute covariance
 #[derive(Debug)]
 pub struct CovarianceAccumulator {
diff --git a/native/spark-expr/src/if_expr.rs b/native/spark-expr/src/if_expr.rs
index 193a90fb..01c754ad 100644
--- a/native/spark-expr/src/if_expr.rs
+++ b/native/spark-expr/src/if_expr.rs
@@ -15,24 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
-
 use arrow::{
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
 };
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::Result;
 use datafusion_physical_expr::{expressions::CaseExpr, PhysicalExpr};
+use std::hash::Hash;
+use std::{any::Any, sync::Arc};
 
 /// IfExpr is a wrapper around CaseExpr, because `IF(a, b, c)` is semantically 
equivalent to
 /// `CASE WHEN a THEN b ELSE c END`.
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct IfExpr {
     if_expr: Arc<dyn PhysicalExpr>,
     true_expr: Arc<dyn PhysicalExpr>,
@@ -41,6 +36,23 @@ pub struct IfExpr {
     case_expr: Arc<CaseExpr>,
 }
 
+impl Hash for IfExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.if_expr.hash(state);
+        self.true_expr.hash(state);
+        self.false_expr.hash(state);
+        self.case_expr.hash(state);
+    }
+}
+impl PartialEq for IfExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.if_expr.eq(&other.if_expr)
+            && self.true_expr.eq(&other.true_expr)
+            && self.false_expr.eq(&other.false_expr)
+            && self.case_expr.eq(&other.case_expr)
+    }
+}
+
 impl std::fmt::Display for IfExpr {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         write!(
@@ -106,27 +118,6 @@ impl PhysicalExpr for IfExpr {
             Arc::clone(&children[2]),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.if_expr.hash(&mut s);
-        self.true_expr.hash(&mut s);
-        self.false_expr.hash(&mut s);
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for IfExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.if_expr.eq(&x.if_expr)
-                    && self.true_expr.eq(&x.true_expr)
-                    && self.false_expr.eq(&x.false_expr)
-            })
-            .unwrap_or(false)
-    }
 }
 
 #[cfg(test)]
diff --git a/native/spark-expr/src/list.rs b/native/spark-expr/src/list.rs
index 7dc17b56..fc31b11a 100644
--- a/native/spark-expr/src/list.rs
+++ b/native/spark-expr/src/list.rs
@@ -26,16 +26,15 @@ use arrow_array::{
 };
 use arrow_schema::{DataType, Field, FieldRef, Schema};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{
     cast::{as_int32_array, as_large_list_array, as_list_array},
     internal_err, DataFusionError, Result as DataFusionResult, ScalarValue,
 };
 use datafusion_physical_expr::PhysicalExpr;
+use std::hash::Hash;
 use std::{
     any::Any,
     fmt::{Debug, Display, Formatter},
-    hash::{Hash, Hasher},
     sync::Arc,
 };
 
@@ -44,7 +43,7 @@ use std::{
 // 
https://github.com/apache/spark/blob/master/common/utils/src/main/java/org/apache/spark/unsafe/array/ByteArrayUtils.java
 const MAX_ROUNDED_ARRAY_LENGTH: usize = 2147483632;
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct ListExtract {
     child: Arc<dyn PhysicalExpr>,
     ordinal: Arc<dyn PhysicalExpr>,
@@ -53,6 +52,25 @@ pub struct ListExtract {
     fail_on_error: bool,
 }
 
+impl Hash for ListExtract {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.ordinal.hash(state);
+        self.default_value.hash(state);
+        self.one_based.hash(state);
+        self.fail_on_error.hash(state);
+    }
+}
+impl PartialEq for ListExtract {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child)
+            && self.ordinal.eq(&other.ordinal)
+            && self.default_value.eq(&other.default_value)
+            && self.one_based.eq(&other.one_based)
+            && self.fail_on_error.eq(&other.fail_on_error)
+    }
+}
+
 impl ListExtract {
     pub fn new(
         child: Arc<dyn PhysicalExpr>,
@@ -176,16 +194,6 @@ impl PhysicalExpr for ListExtract {
             _ => internal_err!("ListExtract should have exactly two children"),
         }
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.ordinal.hash(&mut s);
-        self.default_value.hash(&mut s);
-        self.one_based.hash(&mut s);
-        self.fail_on_error.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 fn one_based_index(index: i32, len: usize) -> DataFusionResult<Option<usize>> {
@@ -267,33 +275,24 @@ impl Display for ListExtract {
     }
 }
 
-impl PartialEq<dyn Any> for ListExtract {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.child.eq(&x.child)
-                    && self.ordinal.eq(&x.ordinal)
-                    && (self.default_value.is_none() == 
x.default_value.is_none())
-                    && self
-                        .default_value
-                        .as_ref()
-                        .zip(x.default_value.as_ref())
-                        .map(|(s, x)| s.eq(x))
-                        .unwrap_or(true)
-                    && self.one_based.eq(&x.one_based)
-                    && self.fail_on_error.eq(&x.fail_on_error)
-            })
-            .unwrap_or(false)
-    }
-}
-
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct GetArrayStructFields {
     child: Arc<dyn PhysicalExpr>,
     ordinal: usize,
 }
 
+impl Hash for GetArrayStructFields {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.ordinal.hash(state);
+    }
+}
+impl PartialEq for GetArrayStructFields {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
+    }
+}
+
 impl GetArrayStructFields {
     pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
         Self { child, ordinal }
@@ -379,13 +378,6 @@ impl PhysicalExpr for GetArrayStructFields {
             _ => internal_err!("GetArrayStructFields should have exactly one 
child"),
         }
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.ordinal.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 fn get_array_struct_fields<O: OffsetSizeTrait>(
@@ -417,16 +409,7 @@ impl Display for GetArrayStructFields {
     }
 }
 
-impl PartialEq<dyn Any> for GetArrayStructFields {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.ordinal.eq(&x.ordinal))
-            .unwrap_or(false)
-    }
-}
-
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct ArrayInsert {
     src_array_expr: Arc<dyn PhysicalExpr>,
     pos_expr: Arc<dyn PhysicalExpr>,
@@ -434,6 +417,23 @@ pub struct ArrayInsert {
     legacy_negative_index: bool,
 }
 
+impl Hash for ArrayInsert {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.src_array_expr.hash(state);
+        self.pos_expr.hash(state);
+        self.item_expr.hash(state);
+        self.legacy_negative_index.hash(state);
+    }
+}
+impl PartialEq for ArrayInsert {
+    fn eq(&self, other: &Self) -> bool {
+        self.src_array_expr.eq(&other.src_array_expr)
+            && self.pos_expr.eq(&other.pos_expr)
+            && self.item_expr.eq(&other.item_expr)
+            && self.legacy_negative_index.eq(&other.legacy_negative_index)
+    }
+}
+
 impl ArrayInsert {
     pub fn new(
         src_array_expr: Arc<dyn PhysicalExpr>,
@@ -555,15 +555,6 @@ impl PhysicalExpr for ArrayInsert {
             _ => internal_err!("ArrayInsert should have exactly three 
childrens"),
         }
     }
-
-    fn dyn_hash(&self, _state: &mut dyn Hasher) {
-        let mut s = _state;
-        self.src_array_expr.hash(&mut s);
-        self.pos_expr.hash(&mut s);
-        self.item_expr.hash(&mut s);
-        self.legacy_negative_index.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 fn array_insert<O: OffsetSizeTrait>(
@@ -694,20 +685,6 @@ impl Display for ArrayInsert {
     }
 }
 
-impl PartialEq<dyn Any> for ArrayInsert {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.src_array_expr.eq(&x.src_array_expr)
-                    && self.pos_expr.eq(&x.pos_expr)
-                    && self.item_expr.eq(&x.item_expr)
-                    && self.legacy_negative_index.eq(&x.legacy_negative_index)
-            })
-            .unwrap_or(false)
-    }
-}
-
 #[cfg(test)]
 mod test {
     use crate::list::{array_insert, list_extract, zero_based_index};
diff --git a/native/spark-expr/src/negative.rs 
b/native/spark-expr/src/negative.rs
index 3d9063e7..7fb50891 100644
--- a/native/spark-expr/src/negative.rs
+++ b/native/spark-expr/src/negative.rs
@@ -21,18 +21,14 @@ use arrow::{compute::kernels::numeric::neg_wrapping, 
datatypes::IntervalDayTimeT
 use arrow_array::RecordBatch;
 use arrow_buffer::IntervalDayTime;
 use arrow_schema::{DataType, Schema};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion::{
     logical_expr::{interval_arithmetic::Interval, ColumnarValue},
     physical_expr::PhysicalExpr,
 };
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::sort_properties::ExprProperties;
-use std::{
-    any::Any,
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
+use std::hash::Hash;
+use std::{any::Any, sync::Arc};
 
 pub fn create_negate_expr(
     expr: Arc<dyn PhysicalExpr>,
@@ -42,13 +38,26 @@ pub fn create_negate_expr(
 }
 
 /// Negative expression
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct NegativeExpr {
     /// Input expression
     arg: Arc<dyn PhysicalExpr>,
     fail_on_error: bool,
 }
 
+impl Hash for NegativeExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.arg.hash(state);
+        self.fail_on_error.hash(state);
+    }
+}
+
+impl PartialEq for NegativeExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.arg.eq(&other.arg) && self.fail_on_error.eq(&other.fail_on_error)
+    }
+}
+
 macro_rules! check_overflow {
     ($array:expr, $array_type:ty, $min_val:expr, $type_name:expr) => {{
         let typed_array = $array
@@ -204,11 +213,6 @@ impl PhysicalExpr for NegativeExpr {
         )))
     }
 
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.hash(&mut s);
-    }
-
     /// Given the child interval of a NegativeExpr, it calculates the 
NegativeExpr's interval.
     /// It replaces the upper and lower bounds after multiplying them with -1.
     /// Ex: `(a, b]` => `[-b, -a)`
@@ -255,12 +259,3 @@ impl PhysicalExpr for NegativeExpr {
         Ok(properties)
     }
 }
-
-impl PartialEq<dyn Any> for NegativeExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.arg.eq(&x.arg))
-            .unwrap_or(false)
-    }
-}
diff --git a/native/spark-expr/src/normalize_nan.rs 
b/native/spark-expr/src/normalize_nan.rs
index c5331ad7..078ce4b5 100644
--- a/native/spark-expr/src/normalize_nan.rs
+++ b/native/spark-expr/src/normalize_nan.rs
@@ -15,13 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    fmt::{Display, Formatter},
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
-
 use arrow::{
     array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array, 
Float64Array},
     datatypes::{ArrowNativeType, Float32Type, Float64Type},
@@ -29,15 +22,33 @@ use arrow::{
 };
 use arrow_schema::{DataType, Schema};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_physical_expr::PhysicalExpr;
+use std::hash::Hash;
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+    sync::Arc,
+};
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct NormalizeNaNAndZero {
     pub data_type: DataType,
     pub child: Arc<dyn PhysicalExpr>,
 }
 
+impl PartialEq for NormalizeNaNAndZero {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.data_type.eq(&other.data_type)
+    }
+}
+
+impl Hash for NormalizeNaNAndZero {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.data_type.hash(state);
+    }
+}
+
 impl NormalizeNaNAndZero {
     pub fn new(data_type: DataType, child: Arc<dyn PhysicalExpr>) -> Self {
         Self { data_type, child }
@@ -89,13 +100,6 @@ impl PhysicalExpr for NormalizeNaNAndZero {
             Arc::clone(&children[0]),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.data_type.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 fn eval_typed<V: FloatDouble, T: ArrayAccessor<Item = V>>(input: T) -> 
Vec<Option<V>> {
@@ -120,15 +124,6 @@ impl Display for NormalizeNaNAndZero {
     }
 }
 
-impl PartialEq<dyn Any> for NormalizeNaNAndZero {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && 
self.data_type.eq(&x.data_type))
-            .unwrap_or(false)
-    }
-}
-
 trait FloatDouble: ArrowNativeType {
     fn is_nan(&self) -> bool;
     fn nan(&self) -> Self;
diff --git a/native/spark-expr/src/regexp.rs b/native/spark-expr/src/regexp.rs
index c7626285..7f367a8b 100644
--- a/native/spark-expr/src/regexp.rs
+++ b/native/spark-expr/src/regexp.rs
@@ -21,7 +21,7 @@ use arrow_array::builder::BooleanBuilder;
 use arrow_array::types::Int32Type;
 use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, 
StringArray};
 use arrow_schema::{DataType, Schema};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
+use datafusion::physical_expr_common::physical_expr::DynEq;
 use datafusion_common::{internal_err, Result};
 use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr::PhysicalExpr;
@@ -53,6 +53,16 @@ impl Hash for RLike {
     }
 }
 
+impl DynEq for RLike {
+    fn dyn_eq(&self, other: &dyn Any) -> bool {
+        if let Some(other) = other.downcast_ref::<Self>() {
+            self.pattern_str == other.pattern_str
+        } else {
+            false
+        }
+    }
+}
+
 impl RLike {
     pub fn try_new(child: Arc<dyn PhysicalExpr>, pattern: &str) -> 
Result<Self> {
         Ok(Self {
@@ -93,15 +103,6 @@ impl Display for RLike {
     }
 }
 
-impl PartialEq<dyn Any> for RLike {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && 
self.pattern_str.eq(&x.pattern_str))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for RLike {
     fn as_any(&self) -> &dyn Any {
         self
@@ -161,10 +162,4 @@ impl PhysicalExpr for RLike {
             &self.pattern_str,
         )?))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        use std::hash::Hash;
-        let mut s = state;
-        self.hash(&mut s);
-    }
 }
diff --git a/native/spark-expr/src/stddev.rs b/native/spark-expr/src/stddev.rs
index 3cf604da..1ec5ffb6 100644
--- a/native/spark-expr/src/stddev.rs
+++ b/native/spark-expr/src/stddev.rs
@@ -23,12 +23,12 @@ use arrow::{
     datatypes::{DataType, Field},
 };
 use datafusion::logical_expr::Accumulator;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
+use datafusion_common::types::NativeType;
 use datafusion_common::{internal_err, Result, ScalarValue};
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::{AggregateUDFImpl, Signature, Volatility};
+use datafusion_physical_expr::expressions::format_state_name;
 use datafusion_physical_expr::expressions::StatsType;
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 
 /// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression
 /// The implementation mostly is the same as the DataFusion's implementation. 
The reason
@@ -39,7 +39,6 @@ use 
datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 pub struct Stddev {
     name: String,
     signature: Signature,
-    expr: Arc<dyn PhysicalExpr>,
     stats_type: StatsType,
     null_on_divide_by_zero: bool,
 }
@@ -47,7 +46,6 @@ pub struct Stddev {
 impl Stddev {
     /// Create a new STDDEV aggregate function
     pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         data_type: DataType,
         stats_type: StatsType,
@@ -57,8 +55,14 @@ impl Stddev {
         assert!(matches!(data_type, DataType::Float64));
         Self {
             name: name.into(),
-            signature: Signature::coercible(vec![DataType::Float64], 
Volatility::Immutable),
-            expr,
+            signature: Signature::coercible(
+                vec![
+                    
datafusion_expr_common::signature::TypeSignatureClass::Native(Arc::new(
+                        NativeType::Float64,
+                    )),
+                ],
+                Volatility::Immutable,
+            ),
             stats_type,
             null_on_divide_by_zero,
         }
@@ -121,20 +125,6 @@ impl AggregateUDFImpl for Stddev {
     }
 }
 
-impl PartialEq<dyn Any> for Stddev {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.expr.eq(&x.expr)
-                    && self.null_on_divide_by_zero == x.null_on_divide_by_zero
-                    && self.stats_type == x.stats_type
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute the standard deviation
 #[derive(Debug)]
 pub struct StddevAccumulator {
diff --git a/native/spark-expr/src/strings.rs b/native/spark-expr/src/strings.rs
index a8aab6ae..c2706b58 100644
--- a/native/spark-expr/src/strings.rs
+++ b/native/spark-expr/src/strings.rs
@@ -27,19 +27,18 @@ use arrow::{
 };
 use arrow_schema::{DataType, Schema};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{DataFusionError, ScalarValue::Utf8};
 use datafusion_physical_expr::PhysicalExpr;
 use std::{
     any::Any,
     fmt::{Display, Formatter},
-    hash::{Hash, Hasher},
+    hash::Hash,
     sync::Arc,
 };
 
 macro_rules! make_predicate_function {
     ($name: ident, $kernel: ident, $str_scalar_kernel: ident) => {
-        #[derive(Debug, Hash)]
+        #[derive(Debug, Eq)]
         pub struct $name {
             left: Arc<dyn PhysicalExpr>,
             right: Arc<dyn PhysicalExpr>,
@@ -57,12 +56,16 @@ macro_rules! make_predicate_function {
             }
         }
 
-        impl PartialEq<dyn Any> for $name {
-            fn eq(&self, other: &dyn Any) -> bool {
-                down_cast_any_ref(other)
-                    .downcast_ref::<Self>()
-                    .map(|x| self.left.eq(&x.left) && self.right.eq(&x.right))
-                    .unwrap_or(false)
+        impl Hash for $name {
+            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+                self.left.hash(state);
+                self.right.hash(state);
+            }
+        }
+
+        impl PartialEq for $name {
+            fn eq(&self, other: &Self) -> bool {
+                self.left.eq(&other.left) && self.right.eq(&other.right)
             }
         }
 
@@ -122,13 +125,6 @@ macro_rules! make_predicate_function {
                     children[1].clone(),
                 )))
             }
-
-            fn dyn_hash(&self, state: &mut dyn Hasher) {
-                let mut s = state;
-                self.left.hash(&mut s);
-                self.right.hash(&mut s);
-                self.hash(&mut s);
-            }
         }
     };
 }
@@ -141,18 +137,43 @@ make_predicate_function!(EndsWith, ends_with_dyn, 
ends_with_utf8_scalar_dyn);
 
 make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn);
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct SubstringExpr {
     pub child: Arc<dyn PhysicalExpr>,
     pub start: i64,
     pub len: u64,
 }
 
-#[derive(Debug, Hash)]
+impl Hash for SubstringExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.start.hash(state);
+        self.len.hash(state);
+    }
+}
+
+impl PartialEq for SubstringExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.start.eq(&other.start) && 
self.len.eq(&other.len)
+    }
+}
+#[derive(Debug, Eq)]
 pub struct StringSpaceExpr {
     pub child: Arc<dyn PhysicalExpr>,
 }
 
+impl Hash for StringSpaceExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+    }
+}
+
+impl PartialEq for StringSpaceExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child)
+    }
+}
+
 impl SubstringExpr {
     pub fn new(child: Arc<dyn PhysicalExpr>, start: i64, len: u64) -> Self {
         Self { child, start, len }
@@ -181,15 +202,6 @@ impl Display for StringSpaceExpr {
     }
 }
 
-impl PartialEq<dyn Any> for SubstringExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.start.eq(&x.start) && 
self.len.eq(&x.len))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for SubstringExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -231,23 +243,6 @@ impl PhysicalExpr for SubstringExpr {
             self.len,
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.start.hash(&mut s);
-        self.len.hash(&mut s);
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for StringSpaceExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child))
-            .unwrap_or(false)
-    }
 }
 
 impl PhysicalExpr for StringSpaceExpr {
@@ -292,10 +287,4 @@ impl PhysicalExpr for StringSpaceExpr {
     ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0]))))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
diff --git a/native/spark-expr/src/structs.rs b/native/spark-expr/src/structs.rs
index cda8246d..7cc49e42 100644
--- a/native/spark-expr/src/structs.rs
+++ b/native/spark-expr/src/structs.rs
@@ -19,17 +19,16 @@ use arrow::record_batch::RecordBatch;
 use arrow_array::{Array, StructArray};
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{DataFusionError, Result as DataFusionResult, 
ScalarValue};
 use datafusion_physical_expr::PhysicalExpr;
 use std::{
     any::Any,
     fmt::{Display, Formatter},
-    hash::{Hash, Hasher},
+    hash::Hash,
     sync::Arc,
 };
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Hash, PartialEq, Eq)]
 pub struct CreateNamedStruct {
     values: Vec<Arc<dyn PhysicalExpr>>,
     names: Vec<String>,
@@ -95,13 +94,6 @@ impl PhysicalExpr for CreateNamedStruct {
             self.names.clone(),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.values.hash(&mut s);
-        self.names.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 impl Display for CreateNamedStruct {
@@ -114,29 +106,24 @@ impl Display for CreateNamedStruct {
     }
 }
 
-impl PartialEq<dyn Any> for CreateNamedStruct {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.values
-                    .iter()
-                    .zip(x.values.iter())
-                    .all(|(a, b)| a.eq(b))
-                    && self.values.len() == x.values.len()
-                    && self.names.iter().zip(x.names.iter()).all(|(a, b)| 
a.eq(b))
-                    && self.names.len() == x.names.len()
-            })
-            .unwrap_or(false)
-    }
-}
-
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct GetStructField {
     child: Arc<dyn PhysicalExpr>,
     ordinal: usize,
 }
 
+impl Hash for GetStructField {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.ordinal.hash(state);
+    }
+}
+impl PartialEq for GetStructField {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
+    }
+}
+
 impl GetStructField {
     pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
         Self { child, ordinal }
@@ -203,13 +190,6 @@ impl PhysicalExpr for GetStructField {
             self.ordinal,
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.ordinal.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 impl Display for GetStructField {
@@ -222,15 +202,6 @@ impl Display for GetStructField {
     }
 }
 
-impl PartialEq<dyn Any> for GetStructField {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.ordinal.eq(&x.ordinal))
-            .unwrap_or(false)
-    }
-}
-
 #[cfg(test)]
 mod test {
     use super::CreateNamedStruct;
diff --git a/native/spark-expr/src/sum_decimal.rs 
b/native/spark-expr/src/sum_decimal.rs
index ab142aee..f3f34d9b 100644
--- a/native/spark-expr/src/sum_decimal.rs
+++ b/native/spark-expr/src/sum_decimal.rs
@@ -25,20 +25,16 @@ use arrow_array::{
 };
 use arrow_schema::{DataType, Field};
 use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{DataFusionError, Result as DFResult, ScalarValue};
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::Volatility::Immutable;
 use datafusion_expr::{AggregateUDFImpl, ReversedUDAF, Signature};
-use datafusion_physical_expr::PhysicalExpr;
 use std::{any::Any, ops::BitAnd, sync::Arc};
 
 #[derive(Debug)]
 pub struct SumDecimal {
     /// Aggregate function signature
     signature: Signature,
-    /// The expression that provides the input decimal values to be summed
-    expr: Arc<dyn PhysicalExpr>,
     /// The data type of the SUM result. This will always be a decimal type
     /// with the same precision and scale as specified in this struct
     result_type: DataType,
@@ -49,7 +45,7 @@ pub struct SumDecimal {
 }
 
 impl SumDecimal {
-    pub fn try_new(expr: Arc<dyn PhysicalExpr>, data_type: DataType) -> 
DFResult<Self> {
+    pub fn try_new(data_type: DataType) -> DFResult<Self> {
         // The `data_type` is the SUM result type passed from Spark side
         let (precision, scale) = match data_type {
             DataType::Decimal128(p, s) => (p, s),
@@ -61,7 +57,6 @@ impl SumDecimal {
         };
         Ok(Self {
             signature: Signature::user_defined(Immutable),
-            expr,
             result_type: data_type,
             precision,
             scale,
@@ -132,20 +127,6 @@ impl AggregateUDFImpl for SumDecimal {
     }
 }
 
-impl PartialEq<dyn Any> for SumDecimal {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                // note that we do not compare result_type because this
-                // is guaranteed to match if the precision and scale
-                // match
-                self.precision == x.precision && self.scale == x.scale && 
self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
 #[derive(Debug)]
 struct SumDecimalAccumulator {
     sum: i128,
@@ -491,13 +472,13 @@ mod tests {
     use datafusion_common::Result;
     use datafusion_expr::AggregateUDF;
     use datafusion_physical_expr::aggregate::AggregateExprBuilder;
-    use datafusion_physical_expr::expressions::{Column, Literal};
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::PhysicalExpr;
     use futures::StreamExt;
 
     #[test]
     fn invalid_data_type() {
-        let expr = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
-        assert!(SumDecimal::try_new(expr, DataType::Int32).is_err());
+        assert!(SumDecimal::try_new(DataType::Int32).is_err());
     }
 
     #[tokio::test]
@@ -518,7 +499,6 @@ mod tests {
             Arc::new(MemoryExec::try_new(partitions, Arc::clone(&schema), 
None).unwrap());
 
         let aggregate_udf = 
Arc::new(AggregateUDF::new_from_impl(SumDecimal::try_new(
-            Arc::clone(&c1),
             data_type.clone(),
         )?));
 
diff --git a/native/spark-expr/src/temporal.rs 
b/native/spark-expr/src/temporal.rs
index 91953dd6..fb549f9c 100644
--- a/native/spark-expr/src/temporal.rs
+++ b/native/spark-expr/src/temporal.rs
@@ -15,36 +15,45 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    fmt::{Debug, Display, Formatter},
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
-
+use crate::utils::array_with_timezone;
 use arrow::{
     compute::{date_part, DatePart},
     record_batch::RecordBatch,
 };
 use arrow_schema::{DataType, Schema, TimeUnit::Microsecond};
 use datafusion::logical_expr::ColumnarValue;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{DataFusionError, ScalarValue::Utf8};
 use datafusion_physical_expr::PhysicalExpr;
-
-use crate::utils::array_with_timezone;
+use std::hash::Hash;
+use std::{
+    any::Any,
+    fmt::{Debug, Display, Formatter},
+    sync::Arc,
+};
 
 use crate::kernels::temporal::{
     date_trunc_array_fmt_dyn, date_trunc_dyn, timestamp_trunc_array_fmt_dyn, 
timestamp_trunc_dyn,
 };
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct HourExpr {
     /// An array with DataType::Timestamp(TimeUnit::Microsecond, None)
     child: Arc<dyn PhysicalExpr>,
     timezone: String,
 }
 
+impl Hash for HourExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.timezone.hash(state);
+    }
+}
+impl PartialEq for HourExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.timezone.eq(&other.timezone)
+    }
+}
+
 impl HourExpr {
     pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self {
         HourExpr { child, timezone }
@@ -61,15 +70,6 @@ impl Display for HourExpr {
     }
 }
 
-impl PartialEq<dyn Any> for HourExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for HourExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -123,22 +123,27 @@ impl PhysicalExpr for HourExpr {
             self.timezone.clone(),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.timezone.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct MinuteExpr {
     /// An array with DataType::Timestamp(TimeUnit::Microsecond, None)
     child: Arc<dyn PhysicalExpr>,
     timezone: String,
 }
 
+impl Hash for MinuteExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.timezone.hash(state);
+    }
+}
+impl PartialEq for MinuteExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.timezone.eq(&other.timezone)
+    }
+}
+
 impl MinuteExpr {
     pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self {
         MinuteExpr { child, timezone }
@@ -155,15 +160,6 @@ impl Display for MinuteExpr {
     }
 }
 
-impl PartialEq<dyn Any> for MinuteExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for MinuteExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -217,22 +213,27 @@ impl PhysicalExpr for MinuteExpr {
             self.timezone.clone(),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.timezone.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct SecondExpr {
     /// An array with DataType::Timestamp(TimeUnit::Microsecond, None)
     child: Arc<dyn PhysicalExpr>,
     timezone: String,
 }
 
+impl Hash for SecondExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.timezone.hash(state);
+    }
+}
+impl PartialEq for SecondExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.timezone.eq(&other.timezone)
+    }
+}
+
 impl SecondExpr {
     pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self {
         SecondExpr { child, timezone }
@@ -249,15 +250,6 @@ impl Display for SecondExpr {
     }
 }
 
-impl PartialEq<dyn Any> for SecondExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for SecondExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -311,16 +303,9 @@ impl PhysicalExpr for SecondExpr {
             self.timezone.clone(),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.timezone.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct DateTruncExpr {
     /// An array with DataType::Date32
     child: Arc<dyn PhysicalExpr>,
@@ -328,6 +313,18 @@ pub struct DateTruncExpr {
     format: Arc<dyn PhysicalExpr>,
 }
 
+impl Hash for DateTruncExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.format.hash(state);
+    }
+}
+impl PartialEq for DateTruncExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child) && self.format.eq(&other.format)
+    }
+}
+
 impl DateTruncExpr {
     pub fn new(child: Arc<dyn PhysicalExpr>, format: Arc<dyn PhysicalExpr>) -> 
Self {
         DateTruncExpr { child, format }
@@ -344,15 +341,6 @@ impl Display for DateTruncExpr {
     }
 }
 
-impl PartialEq<dyn Any> for DateTruncExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.child.eq(&x.child) && self.format.eq(&x.format))
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for DateTruncExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -398,16 +386,9 @@ impl PhysicalExpr for DateTruncExpr {
             Arc::clone(&self.format),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.format.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct TimestampTruncExpr {
     /// An array with DataType::Timestamp(TimeUnit::Microsecond, None)
     child: Arc<dyn PhysicalExpr>,
@@ -422,6 +403,21 @@ pub struct TimestampTruncExpr {
     timezone: String,
 }
 
+impl Hash for TimestampTruncExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.child.hash(state);
+        self.format.hash(state);
+        self.timezone.hash(state);
+    }
+}
+impl PartialEq for TimestampTruncExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.child.eq(&other.child)
+            && self.format.eq(&other.format)
+            && self.timezone.eq(&other.timezone)
+    }
+}
+
 impl TimestampTruncExpr {
     pub fn new(
         child: Arc<dyn PhysicalExpr>,
@@ -446,19 +442,6 @@ impl Display for TimestampTruncExpr {
     }
 }
 
-impl PartialEq<dyn Any> for TimestampTruncExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.child.eq(&x.child)
-                    && self.format.eq(&x.format)
-                    && self.timezone.eq(&x.timezone)
-            })
-            .unwrap_or(false)
-    }
-}
-
 impl PhysicalExpr for TimestampTruncExpr {
     fn as_any(&self) -> &dyn Any {
         self
@@ -524,12 +507,4 @@ impl PhysicalExpr for TimestampTruncExpr {
             self.timezone.clone(),
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.child.hash(&mut s);
-        self.format.hash(&mut s);
-        self.timezone.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs
index 1f68eb86..91b46c6f 100644
--- a/native/spark-expr/src/to_json.rs
+++ b/native/spark-expr/src/to_json.rs
@@ -29,11 +29,11 @@ use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr::PhysicalExpr;
 use std::any::Any;
 use std::fmt::{Debug, Display, Formatter};
-use std::hash::{Hash, Hasher};
+use std::hash::Hash;
 use std::sync::Arc;
 
 /// to_json function
-#[derive(Debug, Hash)]
+#[derive(Debug, Eq)]
 pub struct ToJson {
     /// The input to convert to JSON
     expr: Arc<dyn PhysicalExpr>,
@@ -41,6 +41,18 @@ pub struct ToJson {
     timezone: String,
 }
 
+impl Hash for ToJson {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.expr.hash(state);
+        self.timezone.hash(state);
+    }
+}
+impl PartialEq for ToJson {
+    fn eq(&self, other: &Self) -> bool {
+        self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
+    }
+}
+
 impl ToJson {
     pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
         Self {
@@ -101,13 +113,6 @@ impl PhysicalExpr for ToJson {
             &self.timezone,
         )))
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.expr.hash(&mut s);
-        self.timezone.hash(&mut s);
-        self.hash(&mut s);
-    }
 }
 
 /// Convert an array into a JSON value string representation
diff --git a/native/spark-expr/src/unbound.rs b/native/spark-expr/src/unbound.rs
index a6babd0f..14f68c9c 100644
--- a/native/spark-expr/src/unbound.rs
+++ b/native/spark-expr/src/unbound.rs
@@ -17,15 +17,10 @@
 
 use arrow_array::RecordBatch;
 use arrow_schema::{DataType, Schema};
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion::physical_plan::ColumnarValue;
 use datafusion_common::{internal_err, Result};
 use datafusion_physical_expr::PhysicalExpr;
-use std::{
-    any::Any,
-    hash::{Hash, Hasher},
-    sync::Arc,
-};
+use std::{hash::Hash, sync::Arc};
 
 /// This is similar to `UnKnownColumn` in DataFusion, but it has data type.
 /// This is only used when the column is not bound to a schema, for example, 
the
@@ -93,18 +88,4 @@ impl PhysicalExpr for UnboundColumn {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(self)
     }
-
-    fn dyn_hash(&self, state: &mut dyn Hasher) {
-        let mut s = state;
-        self.hash(&mut s);
-    }
-}
-
-impl PartialEq<dyn Any> for UnboundColumn {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self == x)
-            .unwrap_or(false)
-    }
 }
diff --git a/native/spark-expr/src/variance.rs 
b/native/spark-expr/src/variance.rs
index 4370d89f..e71d713f 100644
--- a/native/spark-expr/src/variance.rs
+++ b/native/spark-expr/src/variance.rs
@@ -15,20 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{any::Any, sync::Arc};
+use std::any::Any;
 
 use arrow::{
     array::{ArrayRef, Float64Array},
     datatypes::{DataType, Field},
 };
 use datafusion::logical_expr::Accumulator;
-use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
 use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::Volatility::Immutable;
 use datafusion_expr::{AggregateUDFImpl, Signature};
+use datafusion_physical_expr::expressions::format_state_name;
 use datafusion_physical_expr::expressions::StatsType;
-use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 
 /// VAR_SAMP and VAR_POP aggregate expression
 /// The implementation mostly is the same as the DataFusion's implementation. 
The reason
@@ -39,7 +38,6 @@ use 
datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
 pub struct Variance {
     name: String,
     signature: Signature,
-    expr: Arc<dyn PhysicalExpr>,
     stats_type: StatsType,
     null_on_divide_by_zero: bool,
 }
@@ -47,7 +45,6 @@ pub struct Variance {
 impl Variance {
     /// Create a new VARIANCE aggregate function
     pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         data_type: DataType,
         stats_type: StatsType,
@@ -58,7 +55,6 @@ impl Variance {
         Self {
             name: name.into(),
             signature: Signature::numeric(1, Immutable),
-            expr,
             stats_type,
             null_on_divide_by_zero,
         }
@@ -118,17 +114,6 @@ impl AggregateUDFImpl for Variance {
     }
 }
 
-impl PartialEq<dyn Any> for Variance {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name && self.expr.eq(&x.expr) && 
self.stats_type == x.stats_type
-            })
-            .unwrap_or(false)
-    }
-}
-
 /// An accumulator to compute variance
 #[derive(Debug)]
 pub struct VarianceAccumulator {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to