This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 197921811 fix: use common implementation of handling object store and 
hdfs urls for native_datafusion and native_iceberg_compat (#1494)
197921811 is described below

commit 1979218115440a27dd677d604fe4a29f37e792b2
Author: Parth Chandra <[email protected]>
AuthorDate: Thu Mar 13 08:25:37 2025 -0700

    fix: use common implementation of handling object store and hdfs urls for 
native_datafusion and native_iceberg_compat (#1494)
    
    * fix: use common implementation of handling object store and hdfs urls for 
native_datafusion and native_iceberg_compat
    
    * clippy
    
    * remove commented code
    
    * address review comments
    
    * fix unit test for hdfs feature
---
 native/Cargo.lock                          | 624 ++++++++++++++++++++++++++++-
 native/Cargo.toml                          |   2 +-
 native/core/src/execution/planner.rs       |  14 +-
 native/core/src/parquet/mod.rs             |  11 +-
 native/core/src/parquet/parquet_support.rs | 159 ++++++--
 5 files changed, 765 insertions(+), 45 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index faa28b13c..845c4314c 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -359,6 +359,12 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
 [[package]]
 name = "autocfg"
 version = "1.4.0"
@@ -415,7 +421,7 @@ dependencies = [
  "proc-macro2",
  "quote",
  "regex",
- "rustc-hash",
+ "rustc-hash 1.1.0",
  "shlex",
  "syn 1.0.109",
  "which",
@@ -547,6 +553,12 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "cfg_aliases"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
+
 [[package]]
 name = "chrono"
 version = "0.4.39"
@@ -556,6 +568,7 @@ dependencies = [
  "android-tzdata",
  "iana-time-zone",
  "num-traits",
+ "serde",
  "windows-targets 0.52.6",
 ]
 
@@ -711,6 +724,16 @@ version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
 
+[[package]]
+name = "core-foundation"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "core-foundation-sys"
 version = "0.8.7"
@@ -992,7 +1015,7 @@ dependencies = [
  "simd-adler32",
  "snap",
  "tempfile",
- "thiserror",
+ "thiserror 1.0.69",
  "tokio",
  "url",
  "zstd 0.11.2+zstd.1.5.2",
@@ -1042,7 +1065,7 @@ dependencies = [
  "parquet",
  "rand",
  "regex",
- "thiserror",
+ "thiserror 1.0.69",
  "tokio",
  "twox-hash 2.1.0",
 ]
@@ -1711,8 +1734,10 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
 dependencies = [
  "cfg-if",
+ "js-sys",
  "libc",
  "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasm-bindgen",
 ]
 
 [[package]]
@@ -1739,6 +1764,25 @@ version = "0.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
 
+[[package]]
+name = "h2"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
 [[package]]
 name = "half"
 version = "2.4.1"
@@ -1802,12 +1846,109 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "http"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
+dependencies = [
+ "bytes",
+ "http",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "httparse"
+version = "1.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
+
 [[package]]
 name = "humantime"
 version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
 
+[[package]]
+name = "hyper"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+ "want",
+]
+
+[[package]]
+name = "hyper-rustls"
+version = "0.27.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
+dependencies = [
+ "futures-util",
+ "http",
+ "hyper",
+ "hyper-util",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pki-types",
+ "tokio",
+ "tokio-rustls",
+ "tower-service",
+]
+
+[[package]]
+name = "hyper-util"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+]
+
 [[package]]
 name = "iana-time-zone"
 version = "0.1.61"
@@ -1993,7 +2134,7 @@ dependencies = [
  "log",
  "num-format",
  "once_cell",
- "quick-xml",
+ "quick-xml 0.26.0",
  "rgb",
  "str_stack",
 ]
@@ -2004,6 +2145,12 @@ version = "3.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
 
+[[package]]
+name = "ipnet"
+version = "2.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
+
 [[package]]
 name = "is-terminal"
 version = "0.4.15"
@@ -2079,7 +2226,7 @@ dependencies = [
  "jni-sys",
  "libloading 0.7.4",
  "log",
- "thiserror",
+ "thiserror 1.0.69",
  "walkdir",
  "windows-sys 0.45.0",
 ]
@@ -2286,7 +2433,7 @@ dependencies = [
  "serde-value",
  "serde_json",
  "serde_yaml",
- "thiserror",
+ "thiserror 1.0.69",
  "thread-id",
  "typemap-ors",
  "winapi",
@@ -2335,6 +2482,12 @@ dependencies = [
  "libmimalloc-sys",
 ]
 
+[[package]]
+name = "mime"
+version = "0.3.17"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+
 [[package]]
 name = "minimal-lexical"
 version = "0.2.1"
@@ -2350,6 +2503,17 @@ dependencies = [
  "adler2",
 ]
 
+[[package]]
+name = "mio"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
+dependencies = [
+ "libc",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "multimap"
 version = "0.8.3"
@@ -2477,13 +2641,24 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf"
 dependencies = [
  "async-trait",
+ "base64",
  "bytes",
  "chrono",
  "futures",
+ "httparse",
  "humantime",
+ "hyper",
  "itertools 0.13.0",
+ "md-5",
  "parking_lot",
  "percent-encoding",
+ "quick-xml 0.37.2",
+ "rand",
+ "reqwest",
+ "ring",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
  "snafu",
  "tokio",
  "tracing",
@@ -2503,6 +2678,12 @@ version = "11.1.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
 
+[[package]]
+name = "openssl-probe"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
+
 [[package]]
 name = "ordered-float"
 version = "2.10.1"
@@ -2722,7 +2903,7 @@ dependencies = [
  "smallvec",
  "symbolic-demangle",
  "tempfile",
- "thiserror",
+ "thiserror 1.0.69",
 ]
 
 [[package]]
@@ -2837,6 +3018,68 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "quick-xml"
+version = "0.37.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quinn"
+version = "0.11.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef"
+dependencies = [
+ "bytes",
+ "pin-project-lite",
+ "quinn-proto",
+ "quinn-udp",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "socket2",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "quinn-proto"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d"
+dependencies = [
+ "bytes",
+ "getrandom 0.2.15",
+ "rand",
+ "ring",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "rustls-pki-types",
+ "slab",
+ "thiserror 2.0.12",
+ "tinyvec",
+ "tracing",
+ "web-time",
+]
+
+[[package]]
+name = "quinn-udp"
+version = "0.5.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944"
+dependencies = [
+ "cfg_aliases",
+ "libc",
+ "once_cell",
+ "socket2",
+ "tracing",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "quote"
 version = "1.0.38"
@@ -2954,6 +3197,52 @@ version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
 
+[[package]]
+name = "reqwest"
+version = "0.12.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
+dependencies = [
+ "base64",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "http-body-util",
+ "hyper",
+ "hyper-rustls",
+ "hyper-util",
+ "ipnet",
+ "js-sys",
+ "log",
+ "mime",
+ "once_cell",
+ "percent-encoding",
+ "pin-project-lite",
+ "quinn",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sync_wrapper",
+ "tokio",
+ "tokio-rustls",
+ "tokio-util",
+ "tower",
+ "tower-service",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "wasm-streams",
+ "web-sys",
+ "windows-registry",
+]
+
 [[package]]
 name = "rgb"
 version = "0.8.50"
@@ -2963,6 +3252,20 @@ dependencies = [
  "bytemuck",
 ]
 
+[[package]]
+name = "ring"
+version = "0.17.13"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom 0.2.15",
+ "libc",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "rustc-demangle"
 version = "0.1.24"
@@ -2975,6 +3278,12 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
 
+[[package]]
+name = "rustc-hash"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
+
 [[package]]
 name = "rustc_version"
 version = "0.4.1"
@@ -2997,6 +3306,61 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "rustls"
+version = "0.23.23"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395"
+dependencies = [
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-native-certs"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
+dependencies = [
+ "openssl-probe",
+ "rustls-pki-types",
+ "schannel",
+ "security-framework",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
+dependencies = [
+ "rustls-pki-types",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
+dependencies = [
+ "web-time",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.102.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
+]
+
 [[package]]
 name = "rustversion"
 version = "1.0.19"
@@ -3018,12 +3382,44 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "schannel"
+version = "0.1.27"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "scopeguard"
 version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
 
+[[package]]
+name = "security-framework"
+version = "3.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
+dependencies = [
+ "bitflags 2.9.0",
+ "core-foundation",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework-sys"
+version = "2.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "semver"
 version = "1.0.25"
@@ -3078,6 +3474,18 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
 [[package]]
 name = "serde_yaml"
 version = "0.9.34+deprecated"
@@ -3168,6 +3576,16 @@ version = "1.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
+[[package]]
+name = "socket2"
+version = "0.5.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "sqlparser"
 version = "0.54.0"
@@ -3272,6 +3690,15 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "sync_wrapper"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
+dependencies = [
+ "futures-core",
+]
+
 [[package]]
 name = "synstructure"
 version = "0.13.1"
@@ -3303,7 +3730,16 @@ version = "1.0.69"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
 dependencies = [
- "thiserror-impl",
+ "thiserror-impl 1.0.69",
+]
+
+[[package]]
+name = "thiserror"
+version = "2.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
+dependencies = [
+ "thiserror-impl 2.0.12",
 ]
 
 [[package]]
@@ -3317,6 +3753,17 @@ dependencies = [
  "syn 2.0.98",
 ]
 
+[[package]]
+name = "thiserror-impl"
+version = "2.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.98",
+]
+
 [[package]]
 name = "thread-id"
 version = "4.2.2"
@@ -3367,6 +3814,21 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "tinyvec"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
+dependencies = [
+ "tinyvec_macros",
+]
+
+[[package]]
+name = "tinyvec_macros"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
+
 [[package]]
 name = "tokio"
 version = "1.43.0"
@@ -3375,9 +3837,13 @@ checksum = 
"3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
 dependencies = [
  "backtrace",
  "bytes",
+ "libc",
+ "mio",
  "parking_lot",
  "pin-project-lite",
+ "socket2",
  "tokio-macros",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -3391,6 +3857,56 @@ dependencies = [
  "syn 2.0.98",
 ]
 
+[[package]]
+name = "tokio-rustls"
+version = "0.26.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
+dependencies = [
+ "rustls",
+ "tokio",
+]
+
+[[package]]
+name = "tokio-util"
+version = "0.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
+name = "tower"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project-lite",
+ "sync_wrapper",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
+
+[[package]]
+name = "tower-service"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
 [[package]]
 name = "tracing"
 version = "0.1.41"
@@ -3422,6 +3938,12 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "try-lock"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+
 [[package]]
 name = "twox-hash"
 version = "1.6.3"
@@ -3489,6 +4011,12 @@ version = "0.2.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
 
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
 [[package]]
 name = "url"
 version = "2.5.4"
@@ -3539,6 +4067,15 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "want"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
+dependencies = [
+ "try-lock",
+]
+
 [[package]]
 name = "wasi"
 version = "0.11.0+wasi-snapshot-preview1"
@@ -3580,6 +4117,19 @@ dependencies = [
  "wasm-bindgen-shared",
 ]
 
+[[package]]
+name = "wasm-bindgen-futures"
+version = "0.4.50"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "once_cell",
+ "wasm-bindgen",
+ "web-sys",
+]
+
 [[package]]
 name = "wasm-bindgen-macro"
 version = "0.2.100"
@@ -3612,6 +4162,19 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "wasm-streams"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
 [[package]]
 name = "web-sys"
 version = "0.3.77"
@@ -3684,6 +4247,36 @@ dependencies = [
  "windows-targets 0.52.6",
 ]
 
+[[package]]
+name = "windows-registry"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0"
+dependencies = [
+ "windows-result",
+ "windows-strings",
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-result"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e"
+dependencies = [
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-strings"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10"
+dependencies = [
+ "windows-result",
+ "windows-targets 0.52.6",
+]
+
 [[package]]
 name = "windows-sys"
 version = "0.45.0"
@@ -3693,6 +4286,15 @@ dependencies = [
  "windows-targets 0.42.2",
 ]
 
+[[package]]
+name = "windows-sys"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
+dependencies = [
+ "windows-targets 0.52.6",
+]
+
 [[package]]
 name = "windows-sys"
 version = "0.59.0"
@@ -3910,6 +4512,12 @@ dependencies = [
  "synstructure",
 ]
 
+[[package]]
+name = "zeroize"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
+
 [[package]]
 name = "zerovec"
 version = "0.10.4"
diff --git a/native/Cargo.toml b/native/Cargo.toml
index 42d4124c0..acb4beb24 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -60,7 +60,7 @@ num = "0.4"
 rand = "0.8"
 regex = "1.9.6"
 thiserror = "1"
-object_store = "0.11.0"
+object_store = { version = "0.11.0", features = ["gcp", "azure", "aws", 
"http"] }
 url = "2.2"
 
 [profile.release]
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index bb1687607..cb8cb104b 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -74,7 +74,7 @@ use 
datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
 
 use crate::execution::shuffle::CompressionCodec;
 use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::{register_object_store, 
SparkParquetOptions};
+use crate::parquet::parquet_support::{prepare_object_store, 
SparkParquetOptions};
 use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
@@ -1208,7 +1208,17 @@ impl PhysicalPlanner {
 
                 // By default, local FS object store registered
                 // if `hdfs` feature enabled then HDFS file object store 
registered
-                let object_store_url = 
register_object_store(Arc::clone(&self.session_ctx))?;
+                // Get one file from the list of files
+                let one_file = scan
+                    .file_partitions
+                    .first()
+                    .and_then(|f| f.partitioned_file.first())
+                    .map(|f| f.file_path.clone())
+                    .ok_or(ExecutionError::GeneralError(
+                        "Failed to locate file".to_string(),
+                    ))?;
+                let (object_store_url, _) =
+                    prepare_object_store(self.session_ctx.runtime_env(), 
one_file)?;
 
                 // Generate file groups
                 let mut file_groups: Vec<Vec<PartitionedFile>> =
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 5f51d6273..9c2513b44 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -46,7 +46,7 @@ use self::util::jni::TypePromotionInfo;
 use crate::execution::operators::ExecutionError;
 use crate::execution::utils::SparkArrowConvert;
 use crate::parquet::data_type::AsBytes;
-use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::parquet_support::{prepare_object_store, 
SparkParquetOptions};
 use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow_array::{Array, RecordBatch};
@@ -54,6 +54,7 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
 use datafusion_comet_spark_expr::EvalMode;
 use datafusion_common::config::TableParquetOptions;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -61,7 +62,7 @@ use futures::{poll, StreamExt};
 use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, 
JString, ReleaseMode};
 use jni::sys::jstring;
 use read::ColumnReader;
-use util::jni::{convert_column_descriptor, convert_encoding, 
deserialize_schema, get_file_path};
+use util::jni::{convert_column_descriptor, convert_encoding, 
deserialize_schema};
 /// Parquet read context maintained across multiple JNI calls.
 struct Context {
     pub column_reader: ColumnReader,
@@ -649,11 +650,11 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
         let runtime = tokio::runtime::Builder::new_multi_thread()
             .enable_all()
             .build()?;
+        let session_ctx = SessionContext::new();
+        let (object_store_url, object_store_path) =
+            prepare_object_store(session_ctx.runtime_env(), path.clone())?;
 
         // EXPERIMENTAL - BEGIN
-        //TODO: Need an execution context and a spark plan equivalent so that 
we can reuse
-        // code from jni_api.rs
-        let (object_store_url, object_store_path) = 
get_file_path(path.clone()).unwrap();
         // TODO: (ARROW NATIVE) - Remove code duplication between this and POC 
1
         // copy the input on-heap buffer to native
         let required_schema_array = JByteArray::from_raw(required_schema);
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 393265204..7755af76d 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -23,14 +23,17 @@ use arrow::{
 };
 use arrow_array::{DictionaryArray, StructArray};
 use arrow_schema::DataType;
-use datafusion::prelude::SessionContext;
 use datafusion_comet_spark_expr::utils::array_with_timezone;
 use datafusion_comet_spark_expr::EvalMode;
 use datafusion_common::{Result as DataFusionResult, ScalarValue};
 use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_expr::ColumnarValue;
+use object_store::path::Path;
+use object_store::{parse_url, ObjectStore};
 use std::collections::HashMap;
 use std::{fmt::Debug, hash::Hash, sync::Arc};
+use url::Url;
 
 static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
 
@@ -199,38 +202,136 @@ fn cast_struct_to_struct(
     }
 }
 
-// Default object store which is local filesystem
+// Mirrors object_store::parse::parse_url for the hdfs object store
+#[cfg(feature = "hdfs")]
+fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+    match 
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
+    {
+        Some(object_store) => {
+            let path = object_store.get_path(url.as_str());
+            Ok((Box::new(object_store), path))
+        }
+        _ => {
+            return Err(object_store::Error::Generic {
+                store: "HadoopFileSystem",
+                source: "Could not create hdfs object store".into(),
+            });
+        }
+    }
+}
+
 #[cfg(not(feature = "hdfs"))]
-pub(crate) fn register_object_store(
-    session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
-    let object_store = object_store::local::LocalFileSystem::new();
-    let url = ObjectStoreUrl::parse("file://")?;
-    session_context
-        .runtime_env()
-        .register_object_store(url.as_ref(), Arc::new(object_store));
-    Ok(url)
+fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+    Err(object_store::Error::Generic {
+        store: "HadoopFileSystem",
+        source: "Hdfs support is not enabled in this build".into(),
+    })
 }
 
-// HDFS object store
-#[cfg(feature = "hdfs")]
-pub(crate) fn register_object_store(
-    session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
-    // TODO: read the namenode configuration from file schema or from 
spark.defaultFS
-    let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
-    if let Some(object_store) =
-        
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
-    {
-        session_context
-            .runtime_env()
-            .register_object_store(url.as_ref(), Arc::new(object_store));
+/// Parses the url, registers the object store, and returns a tuple of the 
object store url and object store path
+pub(crate) fn prepare_object_store(
+    runtime_env: Arc<RuntimeEnv>,
+    url: String,
+) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
+    let mut url = Url::parse(url.as_str())
+        .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL 
{url}: {e}")))?;
+    let mut scheme = url.scheme();
+    if scheme == "s3a" {
+        scheme = "s3";
+        url.set_scheme("s3").map_err(|_| {
+            ExecutionError::GeneralError("Could not convert scheme from s3a to 
s3".to_string())
+        })?;
+    }
+    let url_key = format!(
+        "{}://{}",
+        scheme,
+        &url[url::Position::BeforeHost..url::Position::AfterPort],
+    );
 
-        return Ok(url);
+    let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if 
scheme == "hdfs" {
+        parse_hdfs_url(&url)
+    } else {
+        parse_url(&url)
     }
+    .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+
+    let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
+    runtime_env.register_object_store(&url, Arc::from(object_store));
+    Ok((object_store_url, object_store_path))
+}
 
-    Err(ExecutionError::GeneralError(format!(
-        "HDFS object store cannot be created for {}",
-        url
-    )))
+#[cfg(test)]
+mod tests {
+    use crate::execution::operators::ExecutionError;
+    use crate::parquet::parquet_support::prepare_object_store;
+    use datafusion_execution::object_store::ObjectStoreUrl;
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use object_store::path::Path;
+    use std::sync::Arc;
+    use url::Url;
+
+    #[cfg(not(feature = "hdfs"))]
+    #[test]
+    fn test_prepare_object_store() {
+        let local_file_system_url = 
"file:///comet/spark-warehouse/part-00000.snappy.parquet";
+        let s3_url = 
"s3a://test_bucket/comet/spark-warehouse/part-00000.snappy.parquet";
+        let hdfs_url = 
"hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet";
+
+        let all_urls = [local_file_system_url, s3_url, hdfs_url];
+        let expected: Vec<Result<(ObjectStoreUrl, Path), ExecutionError>> = 
vec![
+            Ok((
+                ObjectStoreUrl::parse("file://").unwrap(),
+                Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+            )),
+            Ok((
+                ObjectStoreUrl::parse("s3://test_bucket").unwrap(),
+                Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+            )),
+            Err(ExecutionError::GeneralError(
+                "Generic HadoopFileSystem error: Hdfs support is not enabled 
in this build"
+                    .parse()
+                    .unwrap(),
+            )),
+        ];
+
+        for (i, url_str) in all_urls.iter().enumerate() {
+            let url = &Url::parse(url_str).unwrap();
+            let res = prepare_object_store(Arc::new(RuntimeEnv::default()), 
url.to_string());
+
+            let expected = expected.get(i).unwrap();
+            match expected {
+                Ok((o, p)) => {
+                    let (r_o, r_p) = res.unwrap();
+                    assert_eq!(r_o, *o);
+                    assert_eq!(r_p, *p);
+                }
+                Err(e) => {
+                    assert!(res.is_err());
+                    let Err(res_e) = res else {
+                        panic!("test failed")
+                    };
+                    assert_eq!(e.to_string(), res_e.to_string())
+                }
+            }
+        }
+    }
+
+    #[test]
+    #[cfg(feature = "hdfs")]
+    fn test_prepare_object_store() {
+        // we use a local file system url instead of an hdfs url because the 
latter requires
+        // a running namenode
+        let hdfs_url = 
"file:///comet/spark-warehouse/part-00000.snappy.parquet";
+        let expected: (ObjectStoreUrl, Path) = (
+            ObjectStoreUrl::parse("file://").unwrap(),
+            Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+        );
+
+        let url = &Url::parse(hdfs_url).unwrap();
+        let res = prepare_object_store(Arc::new(RuntimeEnv::default()), 
url.to_string());
+
+        let res = res.unwrap();
+        assert_eq!(res.0, expected.0);
+        assert_eq!(res.1, expected.1);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to