This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 197921811 fix: use common implementation of handling object store and
hdfs urls for native_datafusion and native_iceberg_compat (#1494)
197921811 is described below
commit 1979218115440a27dd677d604fe4a29f37e792b2
Author: Parth Chandra <[email protected]>
AuthorDate: Thu Mar 13 08:25:37 2025 -0700
fix: use common implementation of handling object store and hdfs urls for
native_datafusion and native_iceberg_compat (#1494)
* fix: use common implementation of handling object store and hdfs urls for
native_datafusion and native_iceberg_compat
* clippy
* remove commented code
* address review comments
* fix unit test for hdfs feature
---
native/Cargo.lock | 624 ++++++++++++++++++++++++++++-
native/Cargo.toml | 2 +-
native/core/src/execution/planner.rs | 14 +-
native/core/src/parquet/mod.rs | 11 +-
native/core/src/parquet/parquet_support.rs | 159 ++++++--
5 files changed, 765 insertions(+), 45 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index faa28b13c..845c4314c 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -359,6 +359,12 @@ dependencies = [
"num-traits",
]
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
[[package]]
name = "autocfg"
version = "1.4.0"
@@ -415,7 +421,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
- "rustc-hash",
+ "rustc-hash 1.1.0",
"shlex",
"syn 1.0.109",
"which",
@@ -547,6 +553,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+[[package]]
+name = "cfg_aliases"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
+
[[package]]
name = "chrono"
version = "0.4.39"
@@ -556,6 +568,7 @@ dependencies = [
"android-tzdata",
"iana-time-zone",
"num-traits",
+ "serde",
"windows-targets 0.52.6",
]
@@ -711,6 +724,16 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
+[[package]]
+name = "core-foundation"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@@ -992,7 +1015,7 @@ dependencies = [
"simd-adler32",
"snap",
"tempfile",
- "thiserror",
+ "thiserror 1.0.69",
"tokio",
"url",
"zstd 0.11.2+zstd.1.5.2",
@@ -1042,7 +1065,7 @@ dependencies = [
"parquet",
"rand",
"regex",
- "thiserror",
+ "thiserror 1.0.69",
"tokio",
"twox-hash 2.1.0",
]
@@ -1711,8 +1734,10 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
+ "js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
+ "wasm-bindgen",
]
[[package]]
@@ -1739,6 +1764,25 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
+[[package]]
+name = "h2"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
[[package]]
name = "half"
version = "2.4.1"
@@ -1802,12 +1846,109 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "http"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
+dependencies = [
+ "bytes",
+ "http",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "httparse"
+version = "1.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
+
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
+[[package]]
+name = "hyper"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+ "want",
+]
+
+[[package]]
+name = "hyper-rustls"
+version = "0.27.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
+dependencies = [
+ "futures-util",
+ "http",
+ "hyper",
+ "hyper-util",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pki-types",
+ "tokio",
+ "tokio-rustls",
+ "tower-service",
+]
+
+[[package]]
+name = "hyper-util"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.61"
@@ -1993,7 +2134,7 @@ dependencies = [
"log",
"num-format",
"once_cell",
- "quick-xml",
+ "quick-xml 0.26.0",
"rgb",
"str_stack",
]
@@ -2004,6 +2145,12 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
+[[package]]
+name = "ipnet"
+version = "2.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
+
[[package]]
name = "is-terminal"
version = "0.4.15"
@@ -2079,7 +2226,7 @@ dependencies = [
"jni-sys",
"libloading 0.7.4",
"log",
- "thiserror",
+ "thiserror 1.0.69",
"walkdir",
"windows-sys 0.45.0",
]
@@ -2286,7 +2433,7 @@ dependencies = [
"serde-value",
"serde_json",
"serde_yaml",
- "thiserror",
+ "thiserror 1.0.69",
"thread-id",
"typemap-ors",
"winapi",
@@ -2335,6 +2482,12 @@ dependencies = [
"libmimalloc-sys",
]
+[[package]]
+name = "mime"
+version = "0.3.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
+
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -2350,6 +2503,17 @@ dependencies = [
"adler2",
]
+[[package]]
+name = "mio"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
+dependencies = [
+ "libc",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "multimap"
version = "0.8.3"
@@ -2477,13 +2641,24 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf"
dependencies = [
"async-trait",
+ "base64",
"bytes",
"chrono",
"futures",
+ "httparse",
"humantime",
+ "hyper",
"itertools 0.13.0",
+ "md-5",
"parking_lot",
"percent-encoding",
+ "quick-xml 0.37.2",
+ "rand",
+ "reqwest",
+ "ring",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
"snafu",
"tokio",
"tracing",
@@ -2503,6 +2678,12 @@ version = "11.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
+[[package]]
+name = "openssl-probe"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
+
[[package]]
name = "ordered-float"
version = "2.10.1"
@@ -2722,7 +2903,7 @@ dependencies = [
"smallvec",
"symbolic-demangle",
"tempfile",
- "thiserror",
+ "thiserror 1.0.69",
]
[[package]]
@@ -2837,6 +3018,68 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "quick-xml"
+version = "0.37.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quinn"
+version = "0.11.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef"
+dependencies = [
+ "bytes",
+ "pin-project-lite",
+ "quinn-proto",
+ "quinn-udp",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "socket2",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "quinn-proto"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d"
+dependencies = [
+ "bytes",
+ "getrandom 0.2.15",
+ "rand",
+ "ring",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "rustls-pki-types",
+ "slab",
+ "thiserror 2.0.12",
+ "tinyvec",
+ "tracing",
+ "web-time",
+]
+
+[[package]]
+name = "quinn-udp"
+version = "0.5.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944"
+dependencies = [
+ "cfg_aliases",
+ "libc",
+ "once_cell",
+ "socket2",
+ "tracing",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "quote"
version = "1.0.38"
@@ -2954,6 +3197,52 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
+[[package]]
+name = "reqwest"
+version = "0.12.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
+dependencies = [
+ "base64",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "http-body-util",
+ "hyper",
+ "hyper-rustls",
+ "hyper-util",
+ "ipnet",
+ "js-sys",
+ "log",
+ "mime",
+ "once_cell",
+ "percent-encoding",
+ "pin-project-lite",
+ "quinn",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sync_wrapper",
+ "tokio",
+ "tokio-rustls",
+ "tokio-util",
+ "tower",
+ "tower-service",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "wasm-streams",
+ "web-sys",
+ "windows-registry",
+]
+
[[package]]
name = "rgb"
version = "0.8.50"
@@ -2963,6 +3252,20 @@ dependencies = [
"bytemuck",
]
+[[package]]
+name = "ring"
+version = "0.17.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom 0.2.15",
+ "libc",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@@ -2975,6 +3278,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
+[[package]]
+name = "rustc-hash"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
+
[[package]]
name = "rustc_version"
version = "0.4.1"
@@ -2997,6 +3306,61 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "rustls"
+version = "0.23.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395"
+dependencies = [
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-native-certs"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
+dependencies = [
+ "openssl-probe",
+ "rustls-pki-types",
+ "schannel",
+ "security-framework",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
+dependencies = [
+ "rustls-pki-types",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
+dependencies = [
+ "web-time",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.102.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
+]
+
[[package]]
name = "rustversion"
version = "1.0.19"
@@ -3018,12 +3382,44 @@ dependencies = [
"winapi-util",
]
+[[package]]
+name = "schannel"
+version = "0.1.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+[[package]]
+name = "security-framework"
+version = "3.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
+dependencies = [
+ "bitflags 2.9.0",
+ "core-foundation",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework-sys"
+version = "2.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
[[package]]
name = "semver"
version = "1.0.25"
@@ -3078,6 +3474,18 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_urlencoded"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+dependencies = [
+ "form_urlencoded",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
@@ -3168,6 +3576,16 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
+[[package]]
+name = "socket2"
+version = "0.5.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "sqlparser"
version = "0.54.0"
@@ -3272,6 +3690,15 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "sync_wrapper"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
+dependencies = [
+ "futures-core",
+]
+
[[package]]
name = "synstructure"
version = "0.13.1"
@@ -3303,7 +3730,16 @@ version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
- "thiserror-impl",
+ "thiserror-impl 1.0.69",
+]
+
+[[package]]
+name = "thiserror"
+version = "2.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
+dependencies = [
+ "thiserror-impl 2.0.12",
]
[[package]]
@@ -3317,6 +3753,17 @@ dependencies = [
"syn 2.0.98",
]
+[[package]]
+name = "thiserror-impl"
+version = "2.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.98",
+]
+
[[package]]
name = "thread-id"
version = "4.2.2"
@@ -3367,6 +3814,21 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "tinyvec"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
+dependencies = [
+ "tinyvec_macros",
+]
+
+[[package]]
+name = "tinyvec_macros"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
+
[[package]]
name = "tokio"
version = "1.43.0"
@@ -3375,9 +3837,13 @@ checksum =
"3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
dependencies = [
"backtrace",
"bytes",
+ "libc",
+ "mio",
"parking_lot",
"pin-project-lite",
+ "socket2",
"tokio-macros",
+ "windows-sys 0.52.0",
]
[[package]]
@@ -3391,6 +3857,56 @@ dependencies = [
"syn 2.0.98",
]
+[[package]]
+name = "tokio-rustls"
+version = "0.26.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
+dependencies = [
+ "rustls",
+ "tokio",
+]
+
+[[package]]
+name = "tokio-util"
+version = "0.7.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
+name = "tower"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project-lite",
+ "sync_wrapper",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
+
+[[package]]
+name = "tower-service"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
[[package]]
name = "tracing"
version = "0.1.41"
@@ -3422,6 +3938,12 @@ dependencies = [
"once_cell",
]
+[[package]]
+name = "try-lock"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+
[[package]]
name = "twox-hash"
version = "1.6.3"
@@ -3489,6 +4011,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
[[package]]
name = "url"
version = "2.5.4"
@@ -3539,6 +4067,15 @@ dependencies = [
"winapi-util",
]
+[[package]]
+name = "want"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
+dependencies = [
+ "try-lock",
+]
+
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@@ -3580,6 +4117,19 @@ dependencies = [
"wasm-bindgen-shared",
]
+[[package]]
+name = "wasm-bindgen-futures"
+version = "0.4.50"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "once_cell",
+ "wasm-bindgen",
+ "web-sys",
+]
+
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
@@ -3612,6 +4162,19 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "wasm-streams"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
[[package]]
name = "web-sys"
version = "0.3.77"
@@ -3684,6 +4247,36 @@ dependencies = [
"windows-targets 0.52.6",
]
+[[package]]
+name = "windows-registry"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0"
+dependencies = [
+ "windows-result",
+ "windows-strings",
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-result"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e"
+dependencies = [
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-strings"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10"
+dependencies = [
+ "windows-result",
+ "windows-targets 0.52.6",
+]
+
[[package]]
name = "windows-sys"
version = "0.45.0"
@@ -3693,6 +4286,15 @@ dependencies = [
"windows-targets 0.42.2",
]
+[[package]]
+name = "windows-sys"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
+dependencies = [
+ "windows-targets 0.52.6",
+]
+
[[package]]
name = "windows-sys"
version = "0.59.0"
@@ -3910,6 +4512,12 @@ dependencies = [
"synstructure",
]
+[[package]]
+name = "zeroize"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
+
[[package]]
name = "zerovec"
version = "0.10.4"
diff --git a/native/Cargo.toml b/native/Cargo.toml
index 42d4124c0..acb4beb24 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -60,7 +60,7 @@ num = "0.4"
rand = "0.8"
regex = "1.9.6"
thiserror = "1"
-object_store = "0.11.0"
+object_store = { version = "0.11.0", features = ["gcp", "azure", "aws",
"http"] }
url = "2.2"
[profile.release]
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index bb1687607..cb8cb104b 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -74,7 +74,7 @@ use
datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::{register_object_store,
SparkParquetOptions};
+use crate::parquet::parquet_support::{prepare_object_store,
SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
@@ -1208,7 +1208,17 @@ impl PhysicalPlanner {
// By default, local FS object store registered
// if `hdfs` feature enabled then HDFS file object store
registered
- let object_store_url =
register_object_store(Arc::clone(&self.session_ctx))?;
+ // Get one file from the list of files
+ let one_file = scan
+ .file_partitions
+ .first()
+ .and_then(|f| f.partitioned_file.first())
+ .map(|f| f.file_path.clone())
+ .ok_or(ExecutionError::GeneralError(
+ "Failed to locate file".to_string(),
+ ))?;
+ let (object_store_url, _) =
+ prepare_object_store(self.session_ctx.runtime_env(),
one_file)?;
// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 5f51d6273..9c2513b44 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -46,7 +46,7 @@ use self::util::jni::TypePromotionInfo;
use crate::execution::operators::ExecutionError;
use crate::execution::utils::SparkArrowConvert;
use crate::parquet::data_type::AsBytes;
-use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::parquet_support::{prepare_object_store,
SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow_array::{Array, RecordBatch};
@@ -54,6 +54,7 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::config::TableParquetOptions;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -61,7 +62,7 @@ use futures::{poll, StreamExt};
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray,
JString, ReleaseMode};
use jni::sys::jstring;
use read::ColumnReader;
-use util::jni::{convert_column_descriptor, convert_encoding,
deserialize_schema, get_file_path};
+use util::jni::{convert_column_descriptor, convert_encoding,
deserialize_schema};
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
@@ -649,11 +650,11 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
+ let session_ctx = SessionContext::new();
+ let (object_store_url, object_store_path) =
+ prepare_object_store(session_ctx.runtime_env(), path.clone())?;
// EXPERIMENTAL - BEGIN
- //TODO: Need an execution context and a spark plan equivalent so that
we can reuse
- // code from jni_api.rs
- let (object_store_url, object_store_path) =
get_file_path(path.clone()).unwrap();
// TODO: (ARROW NATIVE) - Remove code duplication between this and POC
1
// copy the input on-heap buffer to native
let required_schema_array = JByteArray::from_raw(required_schema);
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index 393265204..7755af76d 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -23,14 +23,17 @@ use arrow::{
};
use arrow_array::{DictionaryArray, StructArray};
use arrow_schema::DataType;
-use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::utils::array_with_timezone;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::{Result as DataFusionResult, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::ColumnarValue;
+use object_store::path::Path;
+use object_store::{parse_url, ObjectStore};
use std::collections::HashMap;
use std::{fmt::Debug, hash::Hash, sync::Arc};
+use url::Url;
static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
@@ -199,38 +202,136 @@ fn cast_struct_to_struct(
}
}
-// Default object store which is local filesystem
+// Mirrors object_store::parse::parse_url for the hdfs object store
+#[cfg(feature = "hdfs")]
+fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path),
object_store::Error> {
+ match
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
+ {
+ Some(object_store) => {
+ let path = object_store.get_path(url.as_str());
+ Ok((Box::new(object_store), path))
+ }
+ _ => {
+ return Err(object_store::Error::Generic {
+ store: "HadoopFileSystem",
+ source: "Could not create hdfs object store".into(),
+ });
+ }
+ }
+}
+
#[cfg(not(feature = "hdfs"))]
-pub(crate) fn register_object_store(
- session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
- let object_store = object_store::local::LocalFileSystem::new();
- let url = ObjectStoreUrl::parse("file://")?;
- session_context
- .runtime_env()
- .register_object_store(url.as_ref(), Arc::new(object_store));
- Ok(url)
+fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path),
object_store::Error> {
+ Err(object_store::Error::Generic {
+ store: "HadoopFileSystem",
+ source: "Hdfs support is not enabled in this build".into(),
+ })
}
-// HDFS object store
-#[cfg(feature = "hdfs")]
-pub(crate) fn register_object_store(
- session_context: Arc<SessionContext>,
-) -> Result<ObjectStoreUrl, ExecutionError> {
- // TODO: read the namenode configuration from file schema or from
spark.defaultFS
- let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
- if let Some(object_store) =
-
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
- {
- session_context
- .runtime_env()
- .register_object_store(url.as_ref(), Arc::new(object_store));
+/// Parses the url, registers the object store, and returns a tuple of the
object store url and object store path
+pub(crate) fn prepare_object_store(
+ runtime_env: Arc<RuntimeEnv>,
+ url: String,
+) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
+ let mut url = Url::parse(url.as_str())
+ .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL
{url}: {e}")))?;
+ let mut scheme = url.scheme();
+ if scheme == "s3a" {
+ scheme = "s3";
+ url.set_scheme("s3").map_err(|_| {
+ ExecutionError::GeneralError("Could not convert scheme from s3a to
s3".to_string())
+ })?;
+ }
+ let url_key = format!(
+ "{}://{}",
+ scheme,
+ &url[url::Position::BeforeHost..url::Position::AfterPort],
+ );
- return Ok(url);
+ let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if
scheme == "hdfs" {
+ parse_hdfs_url(&url)
+ } else {
+ parse_url(&url)
}
+ .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+
+ let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
+ runtime_env.register_object_store(&url, Arc::from(object_store));
+ Ok((object_store_url, object_store_path))
+}
- Err(ExecutionError::GeneralError(format!(
- "HDFS object store cannot be created for {}",
- url
- )))
+#[cfg(test)]
+mod tests {
+ use crate::execution::operators::ExecutionError;
+ use crate::parquet::parquet_support::prepare_object_store;
+ use datafusion_execution::object_store::ObjectStoreUrl;
+ use datafusion_execution::runtime_env::RuntimeEnv;
+ use object_store::path::Path;
+ use std::sync::Arc;
+ use url::Url;
+
+ #[cfg(not(feature = "hdfs"))]
+ #[test]
+ fn test_prepare_object_store() {
+ let local_file_system_url =
"file:///comet/spark-warehouse/part-00000.snappy.parquet";
+ let s3_url =
"s3a://test_bucket/comet/spark-warehouse/part-00000.snappy.parquet";
+ let hdfs_url =
"hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet";
+
+ let all_urls = [local_file_system_url, s3_url, hdfs_url];
+ let expected: Vec<Result<(ObjectStoreUrl, Path), ExecutionError>> =
vec![
+ Ok((
+ ObjectStoreUrl::parse("file://").unwrap(),
+ Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+ )),
+ Ok((
+ ObjectStoreUrl::parse("s3://test_bucket").unwrap(),
+ Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+ )),
+ Err(ExecutionError::GeneralError(
+ "Generic HadoopFileSystem error: Hdfs support is not enabled
in this build"
+ .parse()
+ .unwrap(),
+ )),
+ ];
+
+ for (i, url_str) in all_urls.iter().enumerate() {
+ let url = &Url::parse(url_str).unwrap();
+ let res = prepare_object_store(Arc::new(RuntimeEnv::default()),
url.to_string());
+
+ let expected = expected.get(i).unwrap();
+ match expected {
+ Ok((o, p)) => {
+ let (r_o, r_p) = res.unwrap();
+ assert_eq!(r_o, *o);
+ assert_eq!(r_p, *p);
+ }
+ Err(e) => {
+ assert!(res.is_err());
+ let Err(res_e) = res else {
+ panic!("test failed")
+ };
+ assert_eq!(e.to_string(), res_e.to_string())
+ }
+ }
+ }
+ }
+
+ #[test]
+ #[cfg(feature = "hdfs")]
+ fn test_prepare_object_store() {
+ // we use a local file system url instead of an hdfs url because the
latter requires
+ // a running namenode
+ let hdfs_url =
"file:///comet/spark-warehouse/part-00000.snappy.parquet";
+ let expected: (ObjectStoreUrl, Path) = (
+ ObjectStoreUrl::parse("file://").unwrap(),
+ Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
+ );
+
+ let url = &Url::parse(hdfs_url).unwrap();
+ let res = prepare_object_store(Arc::new(RuntimeEnv::default()),
url.to_string());
+
+ let res = res.unwrap();
+ assert_eq!(res.0, expected.0);
+ assert_eq!(res.1, expected.1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]