parthchandra commented on code in PR #4309:
URL: https://github.com/apache/datafusion-comet/pull/4309#discussion_r3290491271
##########
native/core/src/parquet/objectstore/s3.rs:
##########
@@ -78,11 +79,35 @@ pub fn create_store(
source: "Missing bucket name in S3 URL".into(),
})?;
- let credential_provider =
- get_runtime().block_on(build_credential_provider(configs, bucket,
min_ttl))?;
- builder = match credential_provider {
- Some(provider) => builder.with_credentials(Arc::new(provider)),
- None => builder.with_skip_signature(true),
+ // Parquet path: catalog_properties is empty; vendors here read from
Hadoop conf.
+ let empty_props: HashMap<String, String> = HashMap::new();
+ let bridge = match lookup_provider_class(configs, bucket) {
+ Some(provider_class) => match CometS3CredentialBridge::new(
+ provider_class,
+ bucket,
+ bucket,
+ url.path(),
+ AccessMode::Read,
+ &empty_props,
+ ) {
+ Ok(b) => Some(b),
+ Err(e) => {
+ log::warn!(
Review Comment:
It's probably a better idea to fail here and let the user fix the error.
Falling thru to the default provider chain will either fail or worse, succeed
and lead to curious results.
##########
native/core/src/cloud/s3/credential_bridge.rs:
##########
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JNI bridge to the JVM `CometS3CredentialDispatcher` SPI, exposed as
+//! `object_store::CredentialProvider` (raw Parquet path) and
`reqsign_core::ProvideCredential`
+//! (Iceberg via `opendal`). See
`docs/source/contributor-guide/s3-credential-provider-design.md`.
+
+use crate::execution::operators::ExecutionError;
+use crate::jvm_bridge::{jni_new_global_ref, jni_static_call, JVMClasses};
+use async_trait::async_trait;
+use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential;
+use jni::objects::{Global, JFieldID, JObject, JString, JValue};
+use jni::signature::{Primitive, ReturnType};
+use jni::strings::JNIString;
+use jni::sys::jint;
+use log::warn;
+use object_store::aws::AwsCredential;
+use object_store::CredentialProvider;
+use once_cell::sync::OnceCell;
+use reqsign_core::time::Timestamp;
+use reqsign_core::{
+ Context, Error as ReqsignError, ErrorKind as ReqsignErrorKind,
+ ProvideCredential as IcebergProvideCredential,
+};
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Cap on opendal's credential cache when the provider does not report an
expiry. Prevents the
+/// executor from holding a stale credential for the entire job lifetime.
+const DEFAULT_EXPIRY_WHEN_UNKNOWN: Duration = Duration::from_secs(300);
+
+/// Once-per-process latch for the "missing expiry" warning. Bridges are
per-scan, so a per-bridge
+/// latch would re-log on every scan.
+static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new();
+
+/// Access intent forwarded to the Java SPI. Ordinal must match the JVM
`CometS3AccessMode` enum.
+#[derive(Debug, Clone, Copy)]
+pub enum AccessMode {
+ Read = 0,
+ #[allow(dead_code)]
+ Write = 1,
+}
+
+/// Per-scan credential provider that delegates to the JVM SPI via JNI.
`handle` is the JVM-side
+/// identity for the `(provider_class, dispatch_key, catalog_properties)`
triple returned by
+/// `ensureInitialized`. `bucket_jstr` / `path_jstr` are interned once at
construction to avoid
Review Comment:
Couldn't we have per path credentials?
##########
spark/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.cloud.s3;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.comet.util.ClassLoaders;
+
+/**
+ * JNI entry point that resolves a {@link CometS3CredentialProvider} for
native code.
+ *
+ * <p>{@link #ensureInitialized} reflects the named class, runs {@code
initialize(Map)} once, and
+ * returns a {@code long} handle. {@link #getCredentialsForPath} takes that
handle on every
+ * per-request call. See the design notes in the contributor guide for why the
SPI is shaped this
+ * way (keying, multi-tenant isolation, shutdown lifecycle).
+ */
+public final class CometS3CredentialDispatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
+
+ private static final ConcurrentHashMap<InstanceKey, Long> KEY_TO_HANDLE =
+ new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<Long, RegisteredProvider> INSTANCES =
+ new ConcurrentHashMap<>();
+ private static final AtomicLong HANDLE_SEQ = new AtomicLong(1L);
+ private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
+
+ static {
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(CometS3CredentialDispatcher::closeAll,
"comet-s3-credential-shutdown"));
+ }
+
+ private CometS3CredentialDispatcher() {}
+
+ /**
+ * Reflects and initializes the named provider on first call for the {@code
(FQCN, dispatchKey,
+ * catalogProperties)} triple, and returns a handle reused by subsequent
{@link
+ * #getCredentialsForPath} calls.
+ */
+ public static long ensureInitialized(
+ String providerClassName, String dispatchKey, Map<String, String>
catalogProperties) {
+ if (providerClassName == null || providerClassName.isEmpty()) {
+ throw new IllegalArgumentException(
+ "providerClassName is empty; native side should not call without a
configured class");
+ }
+ Map<String, String> snapshot =
+ catalogProperties == null
+ ? Collections.emptyMap()
+ : Collections.unmodifiableMap(new HashMap<>(catalogProperties));
+ InstanceKey key =
+ new InstanceKey(providerClassName, dispatchKey == null ? "" :
dispatchKey, snapshot);
+ return KEY_TO_HANDLE.computeIfAbsent(
+ key,
+ k -> {
+ CometS3CredentialProvider provider =
instantiate(k.providerClassName);
+ provider.initialize(k.catalogProperties);
+ long handle = HANDLE_SEQ.getAndIncrement();
+ INSTANCES.put(handle, new RegisteredProvider(provider, k));
+ return handle;
+ });
+ }
+
+ /**
+ * Invoked by native code on every per-request credential fetch. {@code
handle} must be a value
+ * returned by a prior {@link #ensureInitialized} call; otherwise this
throws. {@code mode} is the
+ * {@link CometS3AccessMode} ordinal.
+ */
+ public static CometS3Credentials getCredentialsForPath(
+ long handle, String bucket, String path, int mode) throws Exception {
+ if (mode < 0 || mode >= MODES.length) {
+ throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: "
+ mode);
+ }
+ RegisteredProvider registered = INSTANCES.get(handle);
+ if (registered == null) {
+ throw new IllegalStateException(
+ "CometS3CredentialProvider handle "
+ + handle
+ + " was not initialized; "
+ + "ensureInitialized must be called before
getCredentialsForPath");
+ }
+ CometS3AccessMode accessMode = MODES[mode];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Fetching credentials via {} (dispatchKey={}, handle={}) for
bucket={} path={} mode={}",
+ registered.key.providerClassName,
+ registered.key.dispatchKey,
+ handle,
+ bucket,
+ path,
+ accessMode);
+ }
+ return registered.provider.getCredentialsForPath(
+ new CometS3CredentialContext(bucket, path, accessMode));
+ }
+
+ private static CometS3CredentialProvider instantiate(String
providerClassName) {
+ Class<?> clazz;
+ try {
+ clazz = ClassLoaders.loadClass(providerClassName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(
+ "CometS3CredentialProvider class not found: "
+ + providerClassName
+ + ". Ensure the vendor JAR is on the executor classpath.",
+ e);
+ }
+ if (!CometS3CredentialProvider.class.isAssignableFrom(clazz)) {
+ throw new IllegalStateException(
+ providerClassName + " does not implement " +
CometS3CredentialProvider.class.getName());
+ }
+ try {
+ Object instance = clazz.getDeclaredConstructor().newInstance();
+ LOG.info("Instantiated CometS3CredentialProvider {}", providerClassName);
+ return (CometS3CredentialProvider) instance;
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException(
+ providerClassName + " must declare a public no-arg constructor", e);
+ } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ throw new IllegalStateException(
+ "Failed to instantiate CometS3CredentialProvider " +
providerClassName, e);
+ }
+ }
+
+ /** Visible for tests; otherwise invoked from the JVM shutdown hook. */
+ static void closeAll() {
+ for (RegisteredProvider registered : INSTANCES.values()) {
+ try {
+ registered.provider.close();
+ } catch (Throwable t) {
+ LOG.warn(
+ "Exception from {} (dispatchKey={}).close() during shutdown",
+ registered.key.providerClassName,
+ registered.key.dispatchKey,
+ t);
+ }
+ }
+ INSTANCES.clear();
+ KEY_TO_HANDLE.clear();
+ }
+
+ private static final class InstanceKey {
+ final String providerClassName;
+ final String dispatchKey;
+ final Map<String, String> catalogProperties;
Review Comment:
This could lead to `KEY_TO_HANDLE` getting quite large if there are many
(JVM) sessions or if some catalog implementation refreshes some catalog
property per table. We could limit the KEY_TO_HANDLE size and evict older keys
to keep this limited.
##########
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala:
##########
@@ -367,12 +367,13 @@ case class CometScanRule(session: SparkSession)
val hadoopDerivedProperties =
CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options)
- // Extract vended credentials from FileIO (REST catalog credential
vending).
- // FileIO properties take precedence over Hadoop-derived
properties because
- // they contain per-table credentials vended by the REST catalog.
+ // Forward the full FileIO property bag (including
credentials.uri, OAuth tokens,
Review Comment:
Properties like OAuth tokens, bearer tokens, etc. should not really be here
as this will get baked into a protobuf that is sent unencrypted over the wire
to executors. Also, if tokens have an expiry then they need to be refreshed or
the credentials provider will fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]