This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 511fe6fae6 NIFI-15176 Added Access Delegation Strategy to REST Iceberg 
Catalog
511fe6fae6 is described below

commit 511fe6fae6c7d6dfbe94645fa9dc146a07132b43
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Nov 3 10:16:19 2025 -0600

    NIFI-15176 Added Access Delegation Strategy to REST Iceberg Catalog
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10492.
---
 .../iceberg/catalog/AccessDelegationStrategy.java  | 55 +++++++++++++++
 .../iceberg/catalog/RESTClientProvider.java        | 35 ++++++++++
 .../iceberg/catalog/RESTIcebergCatalog.java        | 33 ++++-----
 .../catalog/StandardRESTClientProvider.java        | 81 ++++++++++++++++++++++
 .../catalog/StandardRESTClientProviderTest.java    | 74 ++++++++++++++++++++
 5 files changed, 262 insertions(+), 16 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
new file mode 100644
index 0000000000..b0daa9c0e1
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported strategies for REST Catalog 
X-Iceberg-Access-Delegation request header
+ */
+enum AccessDelegationStrategy implements DescribedValue {
+    DISABLED("disabled", "Disabled", "Disables sending the 
X-Iceberg-Access-Delegation request header"),
+
+    VENDED_CREDENTIALS("vended-credentials", "Vended Credentials", "Request 
vended credentials from REST Catalog");
+
+    private final String value;
+
+    private final String displayName;
+
+    private final String description;
+
+    AccessDelegationStrategy(final String value, final String displayName, 
final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
new file mode 100644
index 0000000000..5c2109c568
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.rest.RESTClient;
+
+import java.util.Map;
+
+/**
+ * Abstraction for constructing an Apache Iceberg REST Client from provided 
properties
+ */
+@FunctionalInterface
+interface RESTClientProvider {
+    /**
+     * Build Iceberg REST Client using properties
+     *
+     * @param properties Configuration properties
+     * @return REST Client
+     */
+    RESTClient build(Map<String, String> properties);
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
index 1b3910b838..4c4a9e156e 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
@@ -23,9 +23,7 @@ import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
-import org.apache.iceberg.rest.HTTPClient;
 import org.apache.iceberg.rest.RESTClient;
-import org.apache.iceberg.rest.auth.AuthSession;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -71,6 +69,14 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
             .identifiesControllerService(IcebergFileIOProvider.class)
             .build();
 
+    static final PropertyDescriptor ACCESS_DELEGATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Access Delegation Strategy")
+            .description("Strategy for requesting Access Delegation 
credentials from the Apache Iceberg REST Catalog")
+            .required(true)
+            .defaultValue(AccessDelegationStrategy.VENDED_CREDENTIALS)
+            .allowableValues(AccessDelegationStrategy.class)
+            .build();
+
     static final PropertyDescriptor AUTHENTICATION_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("Authentication Strategy")
             .description("Strategy for authenticating with the Apache Iceberg 
Catalog over HTTP")
@@ -142,6 +148,7 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             CATALOG_URI,
             FILE_IO_PROVIDER,
+            ACCESS_DELEGATION_STRATEGY,
             AUTHENTICATION_STRATEGY,
             BEARER_TOKEN,
             AUTHORIZATION_SERVER_URI,
@@ -239,6 +246,12 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
         final String catalogUri = context.getProperty(CATALOG_URI).getValue();
         properties.put(CatalogProperties.URI, catalogUri);
 
+        // Set Access Delegation Header for REST Client building
+        final AccessDelegationStrategy accessDelegationStrategy = 
context.getProperty(ACCESS_DELEGATION_STRATEGY).asAllowableValue(AccessDelegationStrategy.class);
+        if (AccessDelegationStrategy.VENDED_CREDENTIALS == 
accessDelegationStrategy) {
+            
properties.put(StandardRESTClientProvider.ICEBERG_ACCESS_DELEGATION_HEADER, 
accessDelegationStrategy.getValue());
+        }
+
         final PropertyValue warehouseLocationProperty = 
context.getProperty(WAREHOUSE_LOCATION);
         if (warehouseLocationProperty.isSet()) {
             final String warehouseLocation = 
warehouseLocationProperty.getValue();
@@ -246,13 +259,10 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
         }
 
         final AuthenticationStrategy authenticationStrategy = 
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AuthenticationStrategy.class);
-        final Function<Map<String, String>, RESTClient> restClientBuilder;
-
         if (AuthenticationStrategy.BEARER == authenticationStrategy) {
             final String bearerToken = 
context.getProperty(BEARER_TOKEN).getValue();
             properties.put(OAuth2Properties.TOKEN, bearerToken);
             properties.put(AuthProperties.AUTH_TYPE, 
AuthProperties.AUTH_TYPE_OAUTH2);
-            restClientBuilder = this::buildStandardRestClient;
         } else {
             final String authorizationServerUri = 
context.getProperty(AUTHORIZATION_SERVER_URI).getValue();
             properties.put(OAuth2Properties.OAUTH2_SERVER_URI, 
authorizationServerUri);
@@ -267,15 +277,12 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
                 final String clientSecret = 
context.getProperty(CLIENT_SECRET).getValue();
                 final String clientCredentials = 
CLIENT_CREDENTIALS_FORMAT.formatted(clientId, clientSecret);
                 properties.put(OAuth2Properties.CREDENTIAL, clientCredentials);
-
-                restClientBuilder = configuration -> new 
CredentialsRefreshRESTClient(getLogger(), 
buildStandardRestClient(configuration), clientId, clientSecret);
-            } else {
-                restClientBuilder = this::buildStandardRestClient;
             }
         }
 
+        final RESTClientProvider restClientProvider = new 
StandardRESTClientProvider(getLogger());
         final IcebergFileIOProvider icebergFileIoProvider = 
context.getProperty(FILE_IO_PROVIDER).asControllerService(IcebergFileIOProvider.class);
-        return getSessionCatalog(restClientBuilder, icebergFileIoProvider, 
properties);
+        return getSessionCatalog(restClientProvider::build, 
icebergFileIoProvider, properties);
     }
 
     private RESTSessionCatalog getSessionCatalog(
@@ -296,12 +303,6 @@ public class RESTIcebergCatalog extends 
AbstractControllerService implements Ice
         return restSessionCatalog;
     }
 
-    private RESTClient buildStandardRestClient(final Map<String, String> 
configuration) {
-        final String uri = configuration.get(CatalogProperties.URI);
-        // Set empty Authentication Session to avoid runtime exceptions in 
BaseHTTPClient class
-        return 
HTTPClient.builder(configuration).uri(uri).withAuthSession(AuthSession.EMPTY).build();
-    }
-
     private Map<String, String> getDynamicProperties(final 
ConfigurationContext context) {
         final Map<String, String> properties = new HashMap<>();
 
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
new file mode 100644
index 0000000000..afd50c26bf
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Standard implementation of REST Client Provider supporting configurable 
Access Delegation
+ */
+class StandardRESTClientProvider implements RESTClientProvider {
+    /** Access Delegation HTTP Request Header added in Apache Iceberg 1.5.0 */
+    static final String ICEBERG_ACCESS_DELEGATION_HEADER = 
"X-Iceberg-Access-Delegation";
+
+    private static final Pattern CLIENT_CREDENTIALS_PATTERN = 
Pattern.compile("^([^:]+):(.+)$");
+
+    private static final int CLIENT_ID_GROUP = 1;
+
+    private static final int CLIENT_SECRET_GROUP = 2;
+
+    private final ComponentLog log;
+
+    StandardRESTClientProvider(final ComponentLog log) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+    }
+
+    @Override
+    public RESTClient build(final Map<String, String> properties) {
+        Objects.requireNonNull(properties, "Properties required");
+        final String uri = properties.get(CatalogProperties.URI);
+
+        // Set empty Authentication Session to avoid runtime exceptions in 
BaseHTTPClient class
+        final HTTPClient.Builder builder = 
HTTPClient.builder(properties).uri(uri).withAuthSession(AuthSession.EMPTY);
+
+        if (properties.containsKey(ICEBERG_ACCESS_DELEGATION_HEADER)) {
+            final String accessDelegationHeader = 
properties.get(ICEBERG_ACCESS_DELEGATION_HEADER);
+            builder.withHeader(ICEBERG_ACCESS_DELEGATION_HEADER, 
accessDelegationHeader);
+        }
+
+        final RESTClient restClient;
+
+        if (properties.containsKey(OAuth2Properties.CREDENTIAL)) {
+            final String credential = 
properties.get(OAuth2Properties.CREDENTIAL);
+            final Matcher clientCredentialsMatcher = 
CLIENT_CREDENTIALS_PATTERN.matcher(credential);
+            if (clientCredentialsMatcher.matches()) {
+                final String clientId = 
clientCredentialsMatcher.group(CLIENT_ID_GROUP);
+                final String clientSecret = 
clientCredentialsMatcher.group(CLIENT_SECRET_GROUP);
+                restClient = new CredentialsRefreshRESTClient(log, 
builder.build(), clientId, clientSecret);
+            } else {
+                throw new IllegalStateException("OAuth 2 Client Credentials 
format not found");
+            }
+        } else {
+            restClient = builder.build();
+        }
+
+        return restClient;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
new file mode 100644
index 0000000000..821386fcf8
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class StandardRESTClientProviderTest {
+
+    private static final String CATALOG_URI = "https://localhost";;
+
+    private static final String CLIENT_CREDENTIALS = "CLIENT_ID:CLIENT_SECRET";
+
+    private static final ComponentLog COMPONENT_LOG = new 
MockComponentLog(StandardRESTClientProvider.class.getName(), 
StandardRESTClientProvider.class);
+
+    private final StandardRESTClientProvider provider = new 
StandardRESTClientProvider(COMPONENT_LOG);
+
+    @Test
+    void testBuild() {
+        final Map<String, String> properties = Map.of(CatalogProperties.URI, 
CATALOG_URI);
+
+        final RESTClient restClient = provider.build(properties);
+
+        assertNotNull(restClient);
+    }
+
+    @Test
+    void testBuildAccessDelegationHeader() {
+        final Map<String, String> properties = Map.of(
+                CatalogProperties.URI, CATALOG_URI,
+                StandardRESTClientProvider.ICEBERG_ACCESS_DELEGATION_HEADER, 
AccessDelegationStrategy.VENDED_CREDENTIALS.getValue()
+        );
+
+        final RESTClient restClient = provider.build(properties);
+
+        assertNotNull(restClient);
+    }
+
+    @Test
+    void testBuildClientCredentials() {
+        final Map<String, String> properties = Map.of(
+                CatalogProperties.URI, CATALOG_URI,
+                OAuth2Properties.CREDENTIAL, CLIENT_CREDENTIALS
+        );
+
+        final RESTClient restClient = provider.build(properties);
+
+        assertNotNull(restClient);
+        assertInstanceOf(CredentialsRefreshRESTClient.class, restClient);
+    }
+}

Reply via email to