karuppayya commented on code in PR #4309:
URL: https://github.com/apache/datafusion-comet/pull/4309#discussion_r3283556553


##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve a {@link 
CometS3CredentialProvider}.
+ *
+ * <p>Native code names a vendor class via the activation knob ({@code
+ * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
+ * s3.comet.credential.provider.class} on a Spark catalog property for the 
Iceberg path) and a
+ * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg 
path, bucket name on
+ * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own 
instance, so two catalogs
+ * sharing one provider class get isolated state.
+ */
+public final class CometS3CredentialDispatcher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+  private static final ConcurrentHashMap<InstanceKey, 
CometS3CredentialProvider> INSTANCES =
+      new ConcurrentHashMap<>();
+  private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+  private CometS3CredentialDispatcher() {}
+
+  /**
+   * Reflects and initializes the named provider for {@code (FQCN, 
dispatchKey)} exactly once per
+   * JVM. Subsequent calls with the same key are no-ops. Native code invokes 
this synchronously when
+   * {@code CometS3CredentialBridge} is constructed at plan time, before any 
per-request {@link
+   * #getCredentialsForPath} call. {@code catalogProperties} carries the 
unfiltered FileIO property
+   * bag on the Iceberg path and is empty on the Parquet path.
+   */
+  public static void ensureInitialized(
+      String providerClassName, String dispatchKey, Map<String, String> 
catalogProperties) {
+    if (providerClassName == null || providerClassName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "providerClassName is empty; native side should not call without a 
configured class");
+    }
+    InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? 
"" : dispatchKey);
+    Map<String, String> props =
+        catalogProperties == null ? Collections.emptyMap() : catalogProperties;
+    INSTANCES.computeIfAbsent(
+        key,
+        k -> {
+          CometS3CredentialProvider provider = 
instantiate(k.providerClassName);
+          provider.initialize(props);
+          return provider;
+        });
+  }
+
+  /**
+   * Invoked by native code on every per-request credential fetch. The 
instance must have been
+   * created by a prior {@link #ensureInitialized} call; otherwise this 
throws. {@code mode} is the
+   * {@link CometS3AccessMode} ordinal.
+   */
+  public static CometS3Credentials getCredentialsForPath(
+      String providerClassName, String dispatchKey, String bucket, String 
path, int mode)
+      throws Exception {
+    if (providerClassName == null || providerClassName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "providerClassName is empty; native side should not call without a 
configured class");
+    }
+    if (mode < 0 || mode >= MODES.length) {
+      throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " 
+ mode);
+    }
+    InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? 
"" : dispatchKey);
+    CometS3CredentialProvider provider = INSTANCES.get(key);
+    if (provider == null) {
+      throw new IllegalStateException(
+          "CometS3CredentialProvider "
+              + providerClassName
+              + " (dispatchKey="
+              + key.dispatchKey
+              + ") was not initialized; ensureInitialized must be called 
before"
+              + " getCredentialsForPath");
+    }
+    CometS3AccessMode accessMode = MODES[mode];
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Fetching credentials via {} (dispatchKey={}) for bucket={} path={} 
mode={}",
+          providerClassName,
+          key.dispatchKey,
+          bucket,
+          path,
+          accessMode);
+    }
+    return provider.getCredentialsForPath(bucket, path, accessMode);
+  }
+
+  private static CometS3CredentialProvider instantiate(String 
providerClassName) {
+    Class<?> clazz;
+    try {
+      clazz = Class.forName(providerClassName);

Review Comment:
   We need to move `org.apache.comet.iceberg.IcebergReflection.loadClass` out 
and use here



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.util.Map;
+
+/**
+ * SPI for supplying AWS credentials to Comet's native S3 readers, which 
bypass Spark's Hadoop S3A
+ * code path. Vendors implement this when path-aware or vendor-managed 
credential mechanisms cannot
+ * be reached through the standard parameterless {@code 
AWSCredentialsProvider.getCredentials()}
+ * contract.
+ *
+ * <p>Vendors register an implementation by setting {@code
+ * spark.hadoop.fs.s3a.comet.credential.provider.class} (or the per-bucket 
form {@code
+ * spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) for the 
Parquet path, or
+ * {@code spark.sql.catalog.<catalog>.s3.comet.credential.provider.class} for 
the Iceberg path. The
+ * class must have a public no-arg constructor.
+ *
+ * <p>Comet keys provider instances by {@code (FQCN, dispatchKey)}, where 
{@code dispatchKey} is the
+ * Spark V2 catalog name on the Iceberg path and the bucket on the Parquet 
path. The first time a
+ * given key is seen on an executor, Comet reflects the class, calls {@link 
#initialize(Map)} once,
+ * and caches the instance. Two catalogs that share one FQCN therefore get 
isolated instances with
+ * their own {@code initialize} maps.
+ *
+ * <p>{@link #initialize(Map)} should be cheap and non-blocking; defer real 
credential fetches to
+ * the first {@link #getCredentialsForPath} call. {@link 
#getCredentialsForPath} may be invoked
+ * concurrently from many native worker threads, so implementations must be 
thread-safe.
+ *
+ * <p>Comet does not maintain a TTL cache, broadcast catalog state, or 
schedule refresh. Vendors own
+ * caching, refresh, and any executor-side state distribution. Returns 
credentials or throws; there
+ * is no fall-through return value. See the user guide on S3 credential 
providers for the full
+ * contract and examples.
+ */
+public interface CometS3CredentialProvider {
+
+  /**
+   * Called once per {@code (FQCN, dispatchKey)} on each executor before any 
{@link
+   * #getCredentialsForPath} call. The {@code catalogProperties} map carries 
the full FileIO
+   * property bag for the Iceberg path (including {@code credentials.uri}, 
OAuth tokens, vendor keys
+   * like {@code tenant-id}) and is empty on the Parquet path. The default 
no-op keeps Parquet
+   * vendors source-compatible.
+   *
+   * @param catalogProperties unfiltered FileIO/catalog properties; may 
contain secrets, do not log
+   */
+  default void initialize(Map<String, String> catalogProperties) {}
+
+  /**
+   * @param bucket S3 bucket name (no scheme, no path)
+   * @param path object key or prefix, leading slash included (matches the URL 
path component)
+   * @param mode access intent for this request
+   * @return non-null credentials; {@code null} is a contract violation
+   */
+  CometS3Credentials getCredentialsForPath(String bucket, String path, 
CometS3AccessMode mode)

Review Comment:
   Should we wrap (bucket, path, mode) in a context object? If we want to add a 
new field later, , vendors would have to recompile their JARs to keep working.



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.util.Map;
+
+/**
+ * SPI for supplying AWS credentials to Comet's native S3 readers, which 
bypass Spark's Hadoop S3A
+ * code path. Vendors implement this when path-aware or vendor-managed 
credential mechanisms cannot
+ * be reached through the standard parameterless {@code 
AWSCredentialsProvider.getCredentials()}
+ * contract.
+ *
+ * <p>Vendors register an implementation by setting {@code
+ * spark.hadoop.fs.s3a.comet.credential.provider.class} (or the per-bucket 
form {@code
+ * spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) for the 
Parquet path, or
+ * {@code spark.sql.catalog.<catalog>.s3.comet.credential.provider.class} for 
the Iceberg path. The
+ * class must have a public no-arg constructor.
+ *
+ * <p>Comet keys provider instances by {@code (FQCN, dispatchKey)}, where 
{@code dispatchKey} is the
+ * Spark V2 catalog name on the Iceberg path and the bucket on the Parquet 
path. The first time a
+ * given key is seen on an executor, Comet reflects the class, calls {@link 
#initialize(Map)} once,
+ * and caches the instance. Two catalogs that share one FQCN therefore get 
isolated instances with
+ * their own {@code initialize} maps.
+ *
+ * <p>{@link #initialize(Map)} should be cheap and non-blocking; defer real 
credential fetches to
+ * the first {@link #getCredentialsForPath} call. {@link 
#getCredentialsForPath} may be invoked
+ * concurrently from many native worker threads, so implementations must be 
thread-safe.
+ *
+ * <p>Comet does not maintain a TTL cache, broadcast catalog state, or 
schedule refresh. Vendors own
+ * caching, refresh, and any executor-side state distribution. Returns 
credentials or throws; there
+ * is no fall-through return value. See the user guide on S3 credential 
providers for the full
+ * contract and examples.
+ */
+public interface CometS3CredentialProvider {

Review Comment:
   Should CometS3CredentialProvider extend AutoCloseable (with a default no-op 
close())?



##########
native/core/src/parquet/objectstore/comet_s3_credential_bridge.rs:
##########
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JNI bridge to the `CometS3CredentialDispatcher` SPI, exposed as
+//! `object_store::CredentialProvider` and `reqsign_core::ProvideCredential` 
for the raw Parquet
+//! and Iceberg scan paths respectively.
+//!
+//! The bridge is activated by setting 
`fs.s3a.comet.credential.provider.class` (optionally
+//! per-bucket) in the Hadoop configuration. The vendor's named class is 
instantiated once on
+//! first use inside the JVM dispatcher and reused for the executor lifetime.
+//!
+//! ```text
+//!   JVM                                        Native (Rust)
+//!   ---                                        -------------
+//!
+//!   fs.s3a.comet.credential.provider.class     s3.rs (object_store)
+//!         |                                    iceberg_scan.rs (opendal)
+//!         v                                              |
+//!   CometS3CredentialDispatcher                          v
+//!   (per-class instance cache)                  CometS3CredentialBridge
+//!         ^                                       impl 
object_store::CredentialProvider
+//!         |                                       impl 
reqsign_core::ProvideCredential
+//!         |                                              |
+//!         +<---- JNI call ----------------------------+
+//!         |   getCredentialsForPath(className, bucket, path, mode ordinal)
+//!         v
+//!   vendor CometS3CredentialProvider
+//!         |
+//!         v
+//!   CometS3Credentials POJO
+//!         |
+//!         +------- JNI field reads ---------------->+
+//!                                                   |
+//!                                                   v
+//!                                        AwsCredential / IcebergAwsCredential
+//!                                        (used to sign S3 requests)
+//! ```
+
+use crate::execution::operators::ExecutionError;
+use crate::jvm_bridge::{jni_new_global_ref, jni_static_call, JVMClasses};
+use async_trait::async_trait;
+use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential;

Review Comment:
   This file lives under parquet/objectstore/ but seems to have  hard 
dependency iceberg's code on iceberg_storage_opendal , and in couple of places 
below. Should we move it out in a common module?



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve a {@link 
CometS3CredentialProvider}.
+ *
+ * <p>Native code names a vendor class via the activation knob ({@code
+ * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
+ * s3.comet.credential.provider.class} on a Spark catalog property for the 
Iceberg path) and a
+ * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg 
path, bucket name on
+ * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own 
instance, so two catalogs
+ * sharing one provider class get isolated state.
+ */
+public final class CometS3CredentialDispatcher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+  private static final ConcurrentHashMap<InstanceKey, 
CometS3CredentialProvider> INSTANCES =
+      new ConcurrentHashMap<>();
+  private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+  private CometS3CredentialDispatcher() {}
+
+  /**
+   * Reflects and initializes the named provider for {@code (FQCN, 
dispatchKey)} exactly once per
+   * JVM. Subsequent calls with the same key are no-ops. Native code invokes 
this synchronously when
+   * {@code CometS3CredentialBridge} is constructed at plan time, before any 
per-request {@link
+   * #getCredentialsForPath} call. {@code catalogProperties} carries the 
unfiltered FileIO property
+   * bag on the Iceberg path and is empty on the Parquet path.
+   */
+  public static void ensureInitialized(
+      String providerClassName, String dispatchKey, Map<String, String> 
catalogProperties) {
+    if (providerClassName == null || providerClassName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "providerClassName is empty; native side should not call without a 
configured class");
+    }
+    InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? 
"" : dispatchKey);
+    Map<String, String> props =
+        catalogProperties == null ? Collections.emptyMap() : catalogProperties;
+    INSTANCES.computeIfAbsent(

Review Comment:
   A vendor whose initialize throws  gets re-attempted on every 
`get_credential` call from object_store. Should we cache  error per key and 
backoff. May be a followup



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve a {@link 
CometS3CredentialProvider}.
+ *
+ * <p>Native code names a vendor class via the activation knob ({@code
+ * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
+ * s3.comet.credential.provider.class} on a Spark catalog property for the 
Iceberg path) and a

Review Comment:
   Iceberg is one consumer of the SPI today, but the SPI is broader than that 
and the docs should not bake it in i think.



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve a {@link 
CometS3CredentialProvider}.
+ *
+ * <p>Native code names a vendor class via the activation knob ({@code
+ * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
+ * s3.comet.credential.provider.class} on a Spark catalog property for the 
Iceberg path) and a
+ * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg 
path, bucket name on
+ * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own 
instance, so two catalogs
+ * sharing one provider class get isolated state.
+ */
+public final class CometS3CredentialDispatcher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+  private static final ConcurrentHashMap<InstanceKey, 
CometS3CredentialProvider> INSTANCES =
+      new ConcurrentHashMap<>();
+  private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+  private CometS3CredentialDispatcher() {}

Review Comment:
   Shoudl we have a JVM hook to cleanup cache(which could trigger the close on 
the instances) for a proper cleanup ?



##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JNI entry point invoked from native code to resolve a {@link 
CometS3CredentialProvider}.
+ *
+ * <p>Native code names a vendor class via the activation knob ({@code
+ * fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
+ * s3.comet.credential.provider.class} on a Spark catalog property for the 
Iceberg path) and a
+ * {@code dispatchKey} that scopes the instance: catalog name on the Iceberg 
path, bucket name on
+ * the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own 
instance, so two catalogs
+ * sharing one provider class get isolated state.
+ */
+public final class CometS3CredentialDispatcher {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+  private static final ConcurrentHashMap<InstanceKey, 
CometS3CredentialProvider> INSTANCES =
+      new ConcurrentHashMap<>();
+  private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+  private CometS3CredentialDispatcher() {}
+
+  /**
+   * Reflects and initializes the named provider for {@code (FQCN, 
dispatchKey)} exactly once per
+   * JVM. Subsequent calls with the same key are no-ops. Native code invokes 
this synchronously when
+   * {@code CometS3CredentialBridge} is constructed at plan time, before any 
per-request {@link
+   * #getCredentialsForPath} call. {@code catalogProperties} carries the 
unfiltered FileIO property
+   * bag on the Iceberg path and is empty on the Parquet path.
+   */
+  public static void ensureInitialized(
+      String providerClassName, String dispatchKey, Map<String, String> 
catalogProperties) {
+    if (providerClassName == null || providerClassName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "providerClassName is empty; native side should not call without a 
configured class");
+    }
+    InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? 
"" : dispatchKey);
+    Map<String, String> props =
+        catalogProperties == null ? Collections.emptyMap() : catalogProperties;
+    INSTANCES.computeIfAbsent(
+        key,
+        k -> {
+          CometS3CredentialProvider provider = 
instantiate(k.providerClassName);
+          provider.initialize(props);
+          return provider;
+        });
+  }
+
+  /**
+   * Invoked by native code on every per-request credential fetch. The 
instance must have been
+   * created by a prior {@link #ensureInitialized} call; otherwise this 
throws. {@code mode} is the
+   * {@link CometS3AccessMode} ordinal.
+   */
+  public static CometS3Credentials getCredentialsForPath(
+      String providerClassName, String dispatchKey, String bucket, String 
path, int mode)
+      throws Exception {
+    if (providerClassName == null || providerClassName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "providerClassName is empty; native side should not call without a 
configured class");
+    }
+    if (mode < 0 || mode >= MODES.length) {
+      throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " 
+ mode);
+    }
+    InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? 
"" : dispatchKey);
+    CometS3CredentialProvider provider = INSTANCES.get(key);
+    if (provider == null) {
+      throw new IllegalStateException(
+          "CometS3CredentialProvider "
+              + providerClassName
+              + " (dispatchKey="
+              + key.dispatchKey
+              + ") was not initialized; ensureInitialized must be called 
before"
+              + " getCredentialsForPath");
+    }
+    CometS3AccessMode accessMode = MODES[mode];
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Fetching credentials via {} (dispatchKey={}) for bucket={} path={} 
mode={}",
+          providerClassName,
+          key.dispatchKey,
+          bucket,
+          path,
+          accessMode);
+    }
+    return provider.getCredentialsForPath(bucket, path, accessMode);
+  }
+
+  private static CometS3CredentialProvider instantiate(String 
providerClassName) {
+    Class<?> clazz;
+    try {
+      clazz = Class.forName(providerClassName);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(
+          "CometS3CredentialProvider class not found: "
+              + providerClassName
+              + ". Ensure the vendor JAR is on the executor classpath.",
+          e);
+    }
+    if (!CometS3CredentialProvider.class.isAssignableFrom(clazz)) {
+      throw new IllegalStateException(
+          providerClassName + " does not implement " + 
CometS3CredentialProvider.class.getName());
+    }
+    try {
+      Object instance = clazz.getDeclaredConstructor().newInstance();
+      LOG.info("Instantiated CometS3CredentialProvider {}", providerClassName);
+      return (CometS3CredentialProvider) instance;
+    } catch (NoSuchMethodException e) {
+      throw new IllegalStateException(
+          providerClassName + " must declare a public no-arg constructor", e);
+    } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+      throw new IllegalStateException(
+          "Failed to instantiate CometS3CredentialProvider " + 
providerClassName, e);
+    }
+  }
+
+  private static final class InstanceKey {

Review Comment:
   In  multi-tenant JVMs(like Spark Connect, Thrift Server, or anywhere 
SparkSession.newSession() is used) multiple sessions share one executor JVM. 
Two sessions configuring the same provider class with different 
catalogProperties  will both resolve to the same (FQCN, dispatchKey) cache 
entry.
   First session's initialize silently wins . Session B unknowingly uses 
session A's creds. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to