karuppayya commented on code in PR #4309: URL: https://github.com/apache/datafusion-comet/pull/4309#discussion_r3283556553
########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}. + * + * <p>Native code names a vendor class via the activation knob ({@code + * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code + * s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a + * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg path, bucket name on + * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own instance, so two catalogs + * sharing one provider class get isolated state. + */ +public final class CometS3CredentialDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class); + + private static final ConcurrentHashMap<InstanceKey, CometS3CredentialProvider> INSTANCES = + new ConcurrentHashMap<>(); + private static final CometS3AccessMode[] MODES = CometS3AccessMode.values(); + + private CometS3CredentialDispatcher() {} + + /** + * Reflects and initializes the named provider for {@code (FQCN, dispatchKey)} exactly once per + * JVM. Subsequent calls with the same key are no-ops. Native code invokes this synchronously when + * {@code CometS3CredentialBridge} is constructed at plan time, before any per-request {@link + * #getCredentialsForPath} call. {@code catalogProperties} carries the unfiltered FileIO property + * bag on the Iceberg path and is empty on the Parquet path. + */ + public static void ensureInitialized( + String providerClassName, String dispatchKey, Map<String, String> catalogProperties) { + if (providerClassName == null || providerClassName.isEmpty()) { + throw new IllegalArgumentException( + "providerClassName is empty; native side should not call without a configured class"); + } + InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); + Map<String, String> props = + catalogProperties == null ? Collections.emptyMap() : catalogProperties; + INSTANCES.computeIfAbsent( + key, + k -> { + CometS3CredentialProvider provider = instantiate(k.providerClassName); + provider.initialize(props); + return provider; + }); + } + + /** + * Invoked by native code on every per-request credential fetch. The instance must have been + * created by a prior {@link #ensureInitialized} call; otherwise this throws. {@code mode} is the + * {@link CometS3AccessMode} ordinal. + */ + public static CometS3Credentials getCredentialsForPath( + String providerClassName, String dispatchKey, String bucket, String path, int mode) + throws Exception { + if (providerClassName == null || providerClassName.isEmpty()) { + throw new IllegalArgumentException( + "providerClassName is empty; native side should not call without a configured class"); + } + if (mode < 0 || mode >= MODES.length) { + throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode); + } + InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); + CometS3CredentialProvider provider = INSTANCES.get(key); + if (provider == null) { + throw new IllegalStateException( + "CometS3CredentialProvider " + + providerClassName + + " (dispatchKey=" + + key.dispatchKey + + ") was not initialized; ensureInitialized must be called before" + + " getCredentialsForPath"); + } + CometS3AccessMode accessMode = MODES[mode]; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Fetching credentials via {} (dispatchKey={}) for bucket={} path={} mode={}", + providerClassName, + key.dispatchKey, + bucket, + path, + accessMode); + } + return provider.getCredentialsForPath(bucket, path, accessMode); + } + + private static CometS3CredentialProvider instantiate(String providerClassName) { + Class<?> clazz; + try { + clazz = Class.forName(providerClassName); Review Comment: We need to move `org.apache.comet.iceberg.IcebergReflection.loadClass` out and use here ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.util.Map; + +/** + * SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A + * code path. Vendors implement this when path-aware or vendor-managed credential mechanisms cannot + * be reached through the standard parameterless {@code AWSCredentialsProvider.getCredentials()} + * contract. + * + * <p>Vendors register an implementation by setting {@code + * spark.hadoop.fs.s3a.comet.credential.provider.class} (or the per-bucket form {@code + * spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) for the Parquet path, or + * {@code spark.sql.catalog.<catalog>.s3.comet.credential.provider.class} for the Iceberg path. The + * class must have a public no-arg constructor. + * + * <p>Comet keys provider instances by {@code (FQCN, dispatchKey)}, where {@code dispatchKey} is the + * Spark V2 catalog name on the Iceberg path and the bucket on the Parquet path. The first time a + * given key is seen on an executor, Comet reflects the class, calls {@link #initialize(Map)} once, + * and caches the instance. Two catalogs that share one FQCN therefore get isolated instances with + * their own {@code initialize} maps. + * + * <p>{@link #initialize(Map)} should be cheap and non-blocking; defer real credential fetches to + * the first {@link #getCredentialsForPath} call. {@link #getCredentialsForPath} may be invoked + * concurrently from many native worker threads, so implementations must be thread-safe. + * + * <p>Comet does not maintain a TTL cache, broadcast catalog state, or schedule refresh. Vendors own + * caching, refresh, and any executor-side state distribution. Returns credentials or throws; there + * is no fall-through return value. See the user guide on S3 credential providers for the full + * contract and examples. + */ +public interface CometS3CredentialProvider { + + /** + * Called once per {@code (FQCN, dispatchKey)} on each executor before any {@link + * #getCredentialsForPath} call. The {@code catalogProperties} map carries the full FileIO + * property bag for the Iceberg path (including {@code credentials.uri}, OAuth tokens, vendor keys + * like {@code tenant-id}) and is empty on the Parquet path. The default no-op keeps Parquet + * vendors source-compatible. + * + * @param catalogProperties unfiltered FileIO/catalog properties; may contain secrets, do not log + */ + default void initialize(Map<String, String> catalogProperties) {} + + /** + * @param bucket S3 bucket name (no scheme, no path) + * @param path object key or prefix, leading slash included (matches the URL path component) + * @param mode access intent for this request + * @return non-null credentials; {@code null} is a contract violation + */ + CometS3Credentials getCredentialsForPath(String bucket, String path, CometS3AccessMode mode) Review Comment: Should we wrap (bucket, path, mode) in a context object? If we want to add a new field later, , vendors would have to recompile their JARs to keep working. ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.util.Map; + +/** + * SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A + * code path. Vendors implement this when path-aware or vendor-managed credential mechanisms cannot + * be reached through the standard parameterless {@code AWSCredentialsProvider.getCredentials()} + * contract. + * + * <p>Vendors register an implementation by setting {@code + * spark.hadoop.fs.s3a.comet.credential.provider.class} (or the per-bucket form {@code + * spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) for the Parquet path, or + * {@code spark.sql.catalog.<catalog>.s3.comet.credential.provider.class} for the Iceberg path. The + * class must have a public no-arg constructor. + * + * <p>Comet keys provider instances by {@code (FQCN, dispatchKey)}, where {@code dispatchKey} is the + * Spark V2 catalog name on the Iceberg path and the bucket on the Parquet path. The first time a + * given key is seen on an executor, Comet reflects the class, calls {@link #initialize(Map)} once, + * and caches the instance. Two catalogs that share one FQCN therefore get isolated instances with + * their own {@code initialize} maps. + * + * <p>{@link #initialize(Map)} should be cheap and non-blocking; defer real credential fetches to + * the first {@link #getCredentialsForPath} call. {@link #getCredentialsForPath} may be invoked + * concurrently from many native worker threads, so implementations must be thread-safe. + * + * <p>Comet does not maintain a TTL cache, broadcast catalog state, or schedule refresh. Vendors own + * caching, refresh, and any executor-side state distribution. Returns credentials or throws; there + * is no fall-through return value. See the user guide on S3 credential providers for the full + * contract and examples. + */ +public interface CometS3CredentialProvider { Review Comment: Should CometS3CredentialProvider extend AutoCloseable (with a default no-op close())? ########## native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs: ########## @@ -0,0 +1,458 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JNI bridge to the `CometS3CredentialDispatcher` SPI, exposed as +//! `object_store::CredentialProvider` and `reqsign_core::ProvideCredential` for the raw Parquet +//! and Iceberg scan paths respectively. +//! +//! The bridge is activated by setting `fs.s3a.comet.credential.provider.class` (optionally +//! per-bucket) in the Hadoop configuration. The vendor's named class is instantiated once on +//! first use inside the JVM dispatcher and reused for the executor lifetime. +//! +//! ```text +//! JVM Native (Rust) +//! --- ------------- +//! +//! fs.s3a.comet.credential.provider.class s3.rs (object_store) +//! | iceberg_scan.rs (opendal) +//! v | +//! CometS3CredentialDispatcher v +//! (per-class instance cache) CometS3CredentialBridge +//! ^ impl object_store::CredentialProvider +//! | impl reqsign_core::ProvideCredential +//! | | +//! +<---- JNI call ----------------------------+ +//! | getCredentialsForPath(className, bucket, path, mode ordinal) +//! v +//! vendor CometS3CredentialProvider +//! | +//! v +//! CometS3Credentials POJO +//! | +//! +------- JNI field reads ---------------->+ +//! | +//! v +//! AwsCredential / IcebergAwsCredential +//! (used to sign S3 requests) +//! ``` + +use crate::execution::operators::ExecutionError; +use crate::jvm_bridge::{jni_new_global_ref, jni_static_call, JVMClasses}; +use async_trait::async_trait; +use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential; Review Comment: This file lives under parquet/objectstore/ but seems to have hard dependency iceberg's code on iceberg_storage_opendal , and in couple of places below. Should we move it out in a common module? ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}. + * + * <p>Native code names a vendor class via the activation knob ({@code + * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code + * s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a + * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg path, bucket name on + * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own instance, so two catalogs + * sharing one provider class get isolated state. + */ +public final class CometS3CredentialDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class); + + private static final ConcurrentHashMap<InstanceKey, CometS3CredentialProvider> INSTANCES = + new ConcurrentHashMap<>(); + private static final CometS3AccessMode[] MODES = CometS3AccessMode.values(); + + private CometS3CredentialDispatcher() {} + + /** + * Reflects and initializes the named provider for {@code (FQCN, dispatchKey)} exactly once per + * JVM. Subsequent calls with the same key are no-ops. Native code invokes this synchronously when + * {@code CometS3CredentialBridge} is constructed at plan time, before any per-request {@link + * #getCredentialsForPath} call. {@code catalogProperties} carries the unfiltered FileIO property + * bag on the Iceberg path and is empty on the Parquet path. + */ + public static void ensureInitialized( + String providerClassName, String dispatchKey, Map<String, String> catalogProperties) { + if (providerClassName == null || providerClassName.isEmpty()) { + throw new IllegalArgumentException( + "providerClassName is empty; native side should not call without a configured class"); + } + InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); + Map<String, String> props = + catalogProperties == null ? Collections.emptyMap() : catalogProperties; + INSTANCES.computeIfAbsent( Review Comment: A vendor whose initialize throws gets re-attempted on every `get_credential` call from object_store. Should we cache error per key and backoff. May be a followup ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}. + * + * <p>Native code names a vendor class via the activation knob ({@code + * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code + * s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a Review Comment: Iceberg is one consumer of the SPI today, but the SPI is broader than that and the docs should not bake it in i think. ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}. + * + * <p>Native code names a vendor class via the activation knob ({@code + * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code + * s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a + * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg path, bucket name on + * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own instance, so two catalogs + * sharing one provider class get isolated state. + */ +public final class CometS3CredentialDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class); + + private static final ConcurrentHashMap<InstanceKey, CometS3CredentialProvider> INSTANCES = + new ConcurrentHashMap<>(); + private static final CometS3AccessMode[] MODES = CometS3AccessMode.values(); + + private CometS3CredentialDispatcher() {} Review Comment: Shoudl we have a JVM hook to cleanup cache(which could trigger the close on the instances) for a proper cleanup ? ########## spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.cloud.s3; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}. + * + * <p>Native code names a vendor class via the activation knob ({@code + * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code + * s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a + * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg path, bucket name on + * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own instance, so two catalogs + * sharing one provider class get isolated state. + */ +public final class CometS3CredentialDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class); + + private static final ConcurrentHashMap<InstanceKey, CometS3CredentialProvider> INSTANCES = + new ConcurrentHashMap<>(); + private static final CometS3AccessMode[] MODES = CometS3AccessMode.values(); + + private CometS3CredentialDispatcher() {} + + /** + * Reflects and initializes the named provider for {@code (FQCN, dispatchKey)} exactly once per + * JVM. Subsequent calls with the same key are no-ops. Native code invokes this synchronously when + * {@code CometS3CredentialBridge} is constructed at plan time, before any per-request {@link + * #getCredentialsForPath} call. {@code catalogProperties} carries the unfiltered FileIO property + * bag on the Iceberg path and is empty on the Parquet path. + */ + public static void ensureInitialized( + String providerClassName, String dispatchKey, Map<String, String> catalogProperties) { + if (providerClassName == null || providerClassName.isEmpty()) { + throw new IllegalArgumentException( + "providerClassName is empty; native side should not call without a configured class"); + } + InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); + Map<String, String> props = + catalogProperties == null ? Collections.emptyMap() : catalogProperties; + INSTANCES.computeIfAbsent( + key, + k -> { + CometS3CredentialProvider provider = instantiate(k.providerClassName); + provider.initialize(props); + return provider; + }); + } + + /** + * Invoked by native code on every per-request credential fetch. The instance must have been + * created by a prior {@link #ensureInitialized} call; otherwise this throws. {@code mode} is the + * {@link CometS3AccessMode} ordinal. + */ + public static CometS3Credentials getCredentialsForPath( + String providerClassName, String dispatchKey, String bucket, String path, int mode) + throws Exception { + if (providerClassName == null || providerClassName.isEmpty()) { + throw new IllegalArgumentException( + "providerClassName is empty; native side should not call without a configured class"); + } + if (mode < 0 || mode >= MODES.length) { + throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode); + } + InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); + CometS3CredentialProvider provider = INSTANCES.get(key); + if (provider == null) { + throw new IllegalStateException( + "CometS3CredentialProvider " + + providerClassName + + " (dispatchKey=" + + key.dispatchKey + + ") was not initialized; ensureInitialized must be called before" + + " getCredentialsForPath"); + } + CometS3AccessMode accessMode = MODES[mode]; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Fetching credentials via {} (dispatchKey={}) for bucket={} path={} mode={}", + providerClassName, + key.dispatchKey, + bucket, + path, + accessMode); + } + return provider.getCredentialsForPath(bucket, path, accessMode); + } + + private static CometS3CredentialProvider instantiate(String providerClassName) { + Class<?> clazz; + try { + clazz = Class.forName(providerClassName); + } catch (ClassNotFoundException e) { + throw new IllegalStateException( + "CometS3CredentialProvider class not found: " + + providerClassName + + ". Ensure the vendor JAR is on the executor classpath.", + e); + } + if (!CometS3CredentialProvider.class.isAssignableFrom(clazz)) { + throw new IllegalStateException( + providerClassName + " does not implement " + CometS3CredentialProvider.class.getName()); + } + try { + Object instance = clazz.getDeclaredConstructor().newInstance(); + LOG.info("Instantiated CometS3CredentialProvider {}", providerClassName); + return (CometS3CredentialProvider) instance; + } catch (NoSuchMethodException e) { + throw new IllegalStateException( + providerClassName + " must declare a public no-arg constructor", e); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException( + "Failed to instantiate CometS3CredentialProvider " + providerClassName, e); + } + } + + private static final class InstanceKey { Review Comment: In multi-tenant JVMs(like Spark Connect, Thrift Server, or anywhere SparkSession.newSession() is used) multiple sessions share one executor JVM. Two sessions configuring the same provider class with different catalogProperties will both resolve to the same (FQCN, dispatchKey) cache entry. First session's initialize silently wins . Session B unknowingly uses session A's creds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
