This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7878f0d01 fix: cache object stores and bucket regions to reduce DNS
query volume (#3802)
7878f0d01 is described below
commit 7878f0d01b0411f1a9442a5205649481df4b1257
Author: Andy Grove <[email protected]>
AuthorDate: Tue Mar 31 12:29:29 2026 -0600
fix: cache object stores and bucket regions to reduce DNS query volume
(#3802)
---
native/core/src/parquet/objectstore/s3.rs | 42 +++++++++++++
native/core/src/parquet/parquet_support.rs | 99 +++++++++++++++++++++++++++---
2 files changed, 132 insertions(+), 9 deletions(-)
diff --git a/native/core/src/parquet/objectstore/s3.rs
b/native/core/src/parquet/objectstore/s3.rs
index 6d8f011d0..552276298 100644
--- a/native/core/src/parquet/objectstore/s3.rs
+++ b/native/core/src/parquet/objectstore/s3.rs
@@ -17,6 +17,7 @@
use log::{debug, error};
use std::collections::HashMap;
+use std::sync::OnceLock;
use url::Url;
use crate::execution::jni_api::get_runtime;
@@ -111,13 +112,48 @@ pub fn create_store(
Ok((Box::new(object_store), path))
}
+/// Process-wide cache of resolved S3 bucket regions, keyed by bucket name.
+///
+/// ## Why static / process lifetime?
+///
+/// See the equivalent rationale on `object_store_cache` in
`parquet_support.rs`: the JNI
+/// call site creates a new `RuntimeEnv` per file, leaving the executor
process as the only
+/// available scope for cross-call state. In the standard Spark-on-Kubernetes
deployment
+/// model each executor is dedicated to a single application, so process and
application
+/// lifetimes are equivalent.
+///
+/// ## Unbounded size
+///
+/// A Spark job accesses a bounded, typically small set of S3 buckets, so the
number of
+/// entries stays proportional to the number of distinct buckets. Entries are
just
+/// `(String, String)` pairs and the set does not grow beyond what the job
actually touches.
+///
+/// ## Invalidation
+///
+/// An S3 bucket's region is permanently fixed at creation time and cannot
change; no
+/// invalidation is therefore needed. This is what makes a static,
never-evicting cache
+/// safe here and on the equivalent region-resolution path inside the
`object_store` crate.
+fn region_cache() -> &'static RwLock<HashMap<String, String>> {
+ static CACHE: OnceLock<RwLock<HashMap<String, String>>> = OnceLock::new();
+ CACHE.get_or_init(|| RwLock::new(HashMap::new()))
+}
+
/// Get the bucket region using the [HeadBucket API]. This will fail if the
bucket does not exist.
+/// Results are cached per bucket to avoid redundant network calls.
///
/// [HeadBucket API]:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
///
/// TODO this is copied from the object store crate and has been adapted as a
workaround
/// for https://github.com/apache/arrow-rs-object-store/issues/479
pub async fn resolve_bucket_region(bucket: &str) -> Result<String, Box<dyn
Error>> {
+ // Check cache first
+ if let Ok(cache) = region_cache().read() {
+ if let Some(region) = cache.get(bucket) {
+ debug!("Using cached region '{region}' for bucket '{bucket}'");
+ return Ok(region.clone());
+ }
+ }
+
let endpoint = format!("https://{bucket}.s3.amazonaws.com");
let client = reqwest::Client::new();
@@ -142,6 +178,12 @@ pub async fn resolve_bucket_region(bucket: &str) ->
Result<String, Box<dyn Error
.to_str()?
.to_string();
+ // Cache the resolved region
+ if let Ok(mut cache) = region_cache().write() {
+ debug!("Caching region '{region}' for bucket '{bucket}'");
+ cache.insert(bucket.to_string(), region.clone());
+ }
+
Ok(region)
}
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index e7ff5630f..3418a17c4 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -35,10 +35,13 @@ use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::ColumnarValue;
use datafusion_comet_spark_expr::EvalMode;
+use log::debug;
use object_store::path::Path;
use object_store::{parse_url, ObjectStore};
use std::collections::HashMap;
+use std::sync::OnceLock;
use std::time::Duration;
+use std::{collections::hash_map::DefaultHasher, hash::Hasher, sync::RwLock};
use std::{fmt::Debug, hash::Hash, sync::Arc};
use url::Url;
@@ -444,6 +447,56 @@ fn create_hdfs_object_store(
})
}
+type ObjectStoreCache = RwLock<HashMap<(String, u64), Arc<dyn ObjectStore>>>;
+
+/// Process-wide cache of object stores, keyed by `(scheme://host:port,
config_hash)`.
+///
+/// ## Why static / process lifetime?
+///
+/// Comet's JNI architecture calls `initRecordBatchReader` once per Parquet
file, and each
+/// call constructs a fresh `RuntimeEnv`. There is therefore no
executor-scoped Rust object
+/// with a lifetime longer than a single file read that could own this cache.
The executor
+/// process itself is the natural scope for HTTP connection-pool reuse, so
process lifetime
+/// (i.e. `static`) is the appropriate choice here. In the standard
Spark-on-Kubernetes
+/// deployment model each executor process is dedicated to a single Spark
application, so
+/// process lifetime and application lifetime are equivalent; the cache is
reclaimed when
+/// the executor pod terminates.
+///
+/// ## Unbounded size
+///
+/// Cache entries are indexed by `(scheme://host:port, hash-of-configs)`. A
typical Spark
+/// job accesses a small, fixed set of buckets with a stable configuration, so
the number of
+/// distinct keys is O(buckets × credential-configs) and remains small
throughout the job.
+/// Entries are cheap relative to the cost of creating a new object store (new
HTTP
+/// connection pool + DNS resolution), and there is no meaningful benefit from
eviction, so
+/// no eviction policy is applied.
+///
+/// ## Credential invalidation
+///
+/// Object stores that use dynamic credentials (IMDS, WebIdentity, ECS role,
STS assume-role)
+/// delegate credential refresh to a `CometCredentialProvider` that fetches
fresh credentials
+/// on every request, so credential rotation is transparent and requires no
cache
+/// invalidation. Object stores whose credentials are embedded in the Hadoop
configuration
+/// (e.g. `fs.s3a.access.key` / `fs.s3a.secret.key`) produce a different
`config_hash` when
+/// those values change, which causes a new store to be created and inserted
under the new
+/// key; the old entry is harmlessly superseded.
+fn object_store_cache() -> &'static ObjectStoreCache {
+ static CACHE: OnceLock<ObjectStoreCache> = OnceLock::new();
+ CACHE.get_or_init(|| RwLock::new(HashMap::new()))
+}
+
+/// Compute a hash of the object store configuration for cache keying.
+fn hash_object_store_configs(configs: &HashMap<String, String>) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ let mut keys: Vec<&String> = configs.keys().collect();
+ keys.sort();
+ for key in keys {
+ key.hash(&mut hasher);
+ configs[key].hash(&mut hasher);
+ }
+ hasher.finish()
+}
+
/// Parses the url, registers the object store with configurations, and
returns a tuple of the object store url
/// and object store path
pub(crate) fn prepare_object_store_with_configs(
@@ -467,17 +520,45 @@ pub(crate) fn prepare_object_store_with_configs(
&url[url::Position::BeforeHost..url::Position::AfterPort],
);
- let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if
is_hdfs_scheme {
- create_hdfs_object_store(&url)
- } else if scheme == "s3" {
- objectstore::s3::create_store(&url, object_store_configs,
Duration::from_secs(300))
- } else {
- parse_url(&url)
- }
- .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+ let config_hash = hash_object_store_configs(object_store_configs);
+ let cache_key = (url_key.clone(), config_hash);
+
+ // Check the cache first to reuse existing object store instances.
+ // This enables HTTP connection pooling and avoids redundant DNS lookups.
+ let cached = {
+ let cache = object_store_cache()
+ .read()
+ .map_err(|e| ExecutionError::GeneralError(format!("Object store
cache error: {e}")))?;
+ cache.get(&cache_key).cloned()
+ };
+
+ let (object_store, object_store_path): (Arc<dyn ObjectStore>, Path) =
+ if let Some(store) = cached {
+ debug!("Reusing cached object store for {url_key}");
+ let path = Path::from_url_path(url.path())
+ .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+ (store, path)
+ } else {
+ debug!("Creating new object store for {url_key}");
+ let (store, path): (Box<dyn ObjectStore>, Path) = if
is_hdfs_scheme {
+ create_hdfs_object_store(&url)
+ } else if scheme == "s3" {
+ objectstore::s3::create_store(&url, object_store_configs,
Duration::from_secs(300))
+ } else {
+ parse_url(&url)
+ }
+ .map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
+
+ let store: Arc<dyn ObjectStore> = Arc::from(store);
+ // Insert into cache
+ if let Ok(mut cache) = object_store_cache().write() {
+ cache.insert(cache_key, Arc::clone(&store));
+ }
+ (store, path)
+ };
let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
- runtime_env.register_object_store(&url, Arc::from(object_store));
+ runtime_env.register_object_store(&url, object_store);
Ok((object_store_url, object_store_path))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]