This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 511fe6fae6 NIFI-15176 Added Access Delegation Strategy to REST Iceberg
Catalog
511fe6fae6 is described below
commit 511fe6fae6c7d6dfbe94645fa9dc146a07132b43
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Nov 3 10:16:19 2025 -0600
NIFI-15176 Added Access Delegation Strategy to REST Iceberg Catalog
Signed-off-by: Pierre Villard <[email protected]>
This closes #10492.
---
.../iceberg/catalog/AccessDelegationStrategy.java | 55 +++++++++++++++
.../iceberg/catalog/RESTClientProvider.java | 35 ++++++++++
.../iceberg/catalog/RESTIcebergCatalog.java | 33 ++++-----
.../catalog/StandardRESTClientProvider.java | 81 ++++++++++++++++++++++
.../catalog/StandardRESTClientProviderTest.java | 74 ++++++++++++++++++++
5 files changed, 262 insertions(+), 16 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
new file mode 100644
index 0000000000..b0daa9c0e1
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AccessDelegationStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported strategies for REST Catalog
X-Iceberg-Access-Delegation request header
+ */
+enum AccessDelegationStrategy implements DescribedValue {
+ DISABLED("disabled", "Disabled", "Disables sending the
X-Iceberg-Access-Delegation request header"),
+
+ VENDED_CREDENTIALS("vended-credentials", "Vended Credentials", "Request
vended credentials from REST Catalog");
+
+ private final String value;
+
+ private final String displayName;
+
+ private final String description;
+
+ AccessDelegationStrategy(final String value, final String displayName,
final String description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
new file mode 100644
index 0000000000..5c2109c568
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTClientProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.rest.RESTClient;
+
+import java.util.Map;
+
+/**
+ * Abstraction for constructing an Apache Iceberg REST Client from provided
properties
+ */
+@FunctionalInterface
+interface RESTClientProvider {
+ /**
+ * Build Iceberg REST Client using properties
+ *
+ * @param properties Configuration properties
+ * @return REST Client
+ */
+ RESTClient build(Map<String, String> properties);
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
index 1b3910b838..4c4a9e156e 100644
---
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
@@ -23,9 +23,7 @@ import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
-import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
-import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -71,6 +69,14 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
.identifiesControllerService(IcebergFileIOProvider.class)
.build();
+ static final PropertyDescriptor ACCESS_DELEGATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Access Delegation Strategy")
+ .description("Strategy for requesting Access Delegation
credentials from the Apache Iceberg REST Catalog")
+ .required(true)
+ .defaultValue(AccessDelegationStrategy.VENDED_CREDENTIALS)
+ .allowableValues(AccessDelegationStrategy.class)
+ .build();
+
static final PropertyDescriptor AUTHENTICATION_STRATEGY = new
PropertyDescriptor.Builder()
.name("Authentication Strategy")
.description("Strategy for authenticating with the Apache Iceberg
Catalog over HTTP")
@@ -142,6 +148,7 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
CATALOG_URI,
FILE_IO_PROVIDER,
+ ACCESS_DELEGATION_STRATEGY,
AUTHENTICATION_STRATEGY,
BEARER_TOKEN,
AUTHORIZATION_SERVER_URI,
@@ -239,6 +246,12 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
final String catalogUri = context.getProperty(CATALOG_URI).getValue();
properties.put(CatalogProperties.URI, catalogUri);
+ // Set Access Delegation Header for REST Client building
+ final AccessDelegationStrategy accessDelegationStrategy =
context.getProperty(ACCESS_DELEGATION_STRATEGY).asAllowableValue(AccessDelegationStrategy.class);
+ if (AccessDelegationStrategy.VENDED_CREDENTIALS ==
accessDelegationStrategy) {
+
properties.put(StandardRESTClientProvider.ICEBERG_ACCESS_DELEGATION_HEADER,
accessDelegationStrategy.getValue());
+ }
+
final PropertyValue warehouseLocationProperty =
context.getProperty(WAREHOUSE_LOCATION);
if (warehouseLocationProperty.isSet()) {
final String warehouseLocation =
warehouseLocationProperty.getValue();
@@ -246,13 +259,10 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
}
final AuthenticationStrategy authenticationStrategy =
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AuthenticationStrategy.class);
- final Function<Map<String, String>, RESTClient> restClientBuilder;
-
if (AuthenticationStrategy.BEARER == authenticationStrategy) {
final String bearerToken =
context.getProperty(BEARER_TOKEN).getValue();
properties.put(OAuth2Properties.TOKEN, bearerToken);
properties.put(AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2);
- restClientBuilder = this::buildStandardRestClient;
} else {
final String authorizationServerUri =
context.getProperty(AUTHORIZATION_SERVER_URI).getValue();
properties.put(OAuth2Properties.OAUTH2_SERVER_URI,
authorizationServerUri);
@@ -267,15 +277,12 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
final String clientSecret =
context.getProperty(CLIENT_SECRET).getValue();
final String clientCredentials =
CLIENT_CREDENTIALS_FORMAT.formatted(clientId, clientSecret);
properties.put(OAuth2Properties.CREDENTIAL, clientCredentials);
-
- restClientBuilder = configuration -> new
CredentialsRefreshRESTClient(getLogger(),
buildStandardRestClient(configuration), clientId, clientSecret);
- } else {
- restClientBuilder = this::buildStandardRestClient;
}
}
+ final RESTClientProvider restClientProvider = new
StandardRESTClientProvider(getLogger());
final IcebergFileIOProvider icebergFileIoProvider =
context.getProperty(FILE_IO_PROVIDER).asControllerService(IcebergFileIOProvider.class);
- return getSessionCatalog(restClientBuilder, icebergFileIoProvider,
properties);
+ return getSessionCatalog(restClientProvider::build,
icebergFileIoProvider, properties);
}
private RESTSessionCatalog getSessionCatalog(
@@ -296,12 +303,6 @@ public class RESTIcebergCatalog extends
AbstractControllerService implements Ice
return restSessionCatalog;
}
- private RESTClient buildStandardRestClient(final Map<String, String>
configuration) {
- final String uri = configuration.get(CatalogProperties.URI);
- // Set empty Authentication Session to avoid runtime exceptions in
BaseHTTPClient class
- return
HTTPClient.builder(configuration).uri(uri).withAuthSession(AuthSession.EMPTY).build();
- }
-
private Map<String, String> getDynamicProperties(final
ConfigurationContext context) {
final Map<String, String> properties = new HashMap<>();
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
new file mode 100644
index 0000000000..afd50c26bf
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Standard implementation of REST Client Provider supporting configurable
Access Delegation
+ */
+class StandardRESTClientProvider implements RESTClientProvider {
+ /** Access Delegation HTTP Request Header added in Apache Iceberg 1.5.0 */
+ static final String ICEBERG_ACCESS_DELEGATION_HEADER =
"X-Iceberg-Access-Delegation";
+
+ private static final Pattern CLIENT_CREDENTIALS_PATTERN =
Pattern.compile("^([^:]+):(.+)$");
+
+ private static final int CLIENT_ID_GROUP = 1;
+
+ private static final int CLIENT_SECRET_GROUP = 2;
+
+ private final ComponentLog log;
+
+ StandardRESTClientProvider(final ComponentLog log) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ }
+
+ @Override
+ public RESTClient build(final Map<String, String> properties) {
+ Objects.requireNonNull(properties, "Properties required");
+ final String uri = properties.get(CatalogProperties.URI);
+
+ // Set empty Authentication Session to avoid runtime exceptions in
BaseHTTPClient class
+ final HTTPClient.Builder builder =
HTTPClient.builder(properties).uri(uri).withAuthSession(AuthSession.EMPTY);
+
+ if (properties.containsKey(ICEBERG_ACCESS_DELEGATION_HEADER)) {
+ final String accessDelegationHeader =
properties.get(ICEBERG_ACCESS_DELEGATION_HEADER);
+ builder.withHeader(ICEBERG_ACCESS_DELEGATION_HEADER,
accessDelegationHeader);
+ }
+
+ final RESTClient restClient;
+
+ if (properties.containsKey(OAuth2Properties.CREDENTIAL)) {
+ final String credential =
properties.get(OAuth2Properties.CREDENTIAL);
+ final Matcher clientCredentialsMatcher =
CLIENT_CREDENTIALS_PATTERN.matcher(credential);
+ if (clientCredentialsMatcher.matches()) {
+ final String clientId =
clientCredentialsMatcher.group(CLIENT_ID_GROUP);
+ final String clientSecret =
clientCredentialsMatcher.group(CLIENT_SECRET_GROUP);
+ restClient = new CredentialsRefreshRESTClient(log,
builder.build(), clientId, clientSecret);
+ } else {
+ throw new IllegalStateException("OAuth 2 Client Credentials
format not found");
+ }
+ } else {
+ restClient = builder.build();
+ }
+
+ return restClient;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
new file mode 100644
index 0000000000..821386fcf8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/test/java/org/apache/nifi/services/iceberg/catalog/StandardRESTClientProviderTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class StandardRESTClientProviderTest {
+
+ private static final String CATALOG_URI = "https://localhost";
+
+ private static final String CLIENT_CREDENTIALS = "CLIENT_ID:CLIENT_SECRET";
+
+ private static final ComponentLog COMPONENT_LOG = new
MockComponentLog(StandardRESTClientProvider.class.getName(),
StandardRESTClientProvider.class);
+
+ private final StandardRESTClientProvider provider = new
StandardRESTClientProvider(COMPONENT_LOG);
+
+ @Test
+ void testBuild() {
+ final Map<String, String> properties = Map.of(CatalogProperties.URI,
CATALOG_URI);
+
+ final RESTClient restClient = provider.build(properties);
+
+ assertNotNull(restClient);
+ }
+
+ @Test
+ void testBuildAccessDelegationHeader() {
+ final Map<String, String> properties = Map.of(
+ CatalogProperties.URI, CATALOG_URI,
+ StandardRESTClientProvider.ICEBERG_ACCESS_DELEGATION_HEADER,
AccessDelegationStrategy.VENDED_CREDENTIALS.getValue()
+ );
+
+ final RESTClient restClient = provider.build(properties);
+
+ assertNotNull(restClient);
+ }
+
+ @Test
+ void testBuildClientCredentials() {
+ final Map<String, String> properties = Map.of(
+ CatalogProperties.URI, CATALOG_URI,
+ OAuth2Properties.CREDENTIAL, CLIENT_CREDENTIALS
+ );
+
+ final RESTClient restClient = provider.build(properties);
+
+ assertNotNull(restClient);
+ assertInstanceOf(CredentialsRefreshRESTClient.class, restClient);
+ }
+}