This is an automated email from the ASF dual-hosted git repository.

dhuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new b2b1453e8 Modularize federation (Option 2) (#2332)
b2b1453e8 is described below

commit b2b1453e8cfaecc9cd3a0eb943fae6f48f287e4d
Author: Pooja Nilangekar <nilangekar.po...@gmail.com>
AuthorDate: Thu Aug 14 17:27:41 2025 -0400

    Modularize federation (Option 2) (#2332)
    
    * Modularize federation (Option 2)
    
    * Move polaris-extensions-federation-hadoop dependency
    
    * Change identifier to lowerCase
    
    * Change identifiers to constants
---
 extensions/federation/hadoop/build.gradle.kts      | 57 +++++++++++++++++++
 .../hadoop/HadoopFederatedCatalogFactory.java      | 61 ++++++++++++++++++++
 gradle/projects.main.properties                    |  1 +
 .../core/catalog/ExternalCatalogFactory.java       | 43 ++++++++++++++
 .../polaris/core/connection/ConnectionType.java    | 21 +++++++
 runtime/server/build.gradle.kts                    |  1 +
 .../catalog/iceberg/IcebergCatalogAdapter.java     | 11 +++-
 .../catalog/iceberg/IcebergCatalogHandler.java     | 66 +++++++---------------
 .../iceberg/IcebergRESTExternalCatalogFactory.java | 65 +++++++++++++++++++++
 .../catalog/IcebergCatalogHandlerAuthzTest.java    | 19 ++++++-
 .../org/apache/polaris/service/TestServices.java   | 10 +++-
 11 files changed, 303 insertions(+), 52 deletions(-)

diff --git a/extensions/federation/hadoop/build.gradle.kts 
b/extensions/federation/hadoop/build.gradle.kts
new file mode 100644
index 000000000..431da94e5
--- /dev/null
+++ b/extensions/federation/hadoop/build.gradle.kts
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("polaris-client")
+  alias(libs.plugins.jandex)
+}
+
+dependencies {
+  // Polaris dependencies
+  implementation(project(":polaris-core"))
+
+  implementation(platform(libs.iceberg.bom))
+  implementation("org.apache.iceberg:iceberg-api")
+  implementation("org.apache.iceberg:iceberg-core")
+  implementation("org.apache.iceberg:iceberg-common")
+
+  // Hadoop dependencies (for Hadoop catalog support)
+  implementation(libs.hadoop.common) {
+    exclude("org.slf4j", "slf4j-reload4j")
+    exclude("org.slf4j", "slf4j-log4j12")
+    exclude("ch.qos.reload4j", "reload4j")
+    exclude("log4j", "log4j")
+    exclude("org.apache.zookeeper", "zookeeper")
+    exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_25")
+    exclude("com.github.pjfanning", "jersey-json")
+    exclude("com.sun.jersey", "jersey-core")
+    exclude("com.sun.jersey", "jersey-server")
+    exclude("com.sun.jersey", "jersey-servlet")
+    exclude("io.dropwizard.metrics", "metrics-core")
+  }
+  implementation(libs.hadoop.client.api)
+  implementation(libs.hadoop.client.runtime)
+
+  // CDI dependencies for runtime discovery
+  implementation(libs.jakarta.enterprise.cdi.api)
+  implementation(libs.smallrye.common.annotation)
+
+  // Logging
+  implementation(libs.slf4j.api)
+}
diff --git 
a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
 
b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
new file mode 100644
index 000000000..50294da99
--- /dev/null
+++ 
b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.extensions.federation.hadoop;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.AuthenticationType;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
+import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import org.apache.polaris.core.secrets.UserSecretsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Factory class for creating a Hadoop catalog handle based on connection 
configuration. */
+@ApplicationScoped
+@Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER)
+public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class);
+
+  @Override
+  public Catalog createCatalog(
+      ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager 
userSecretsManager) {
+    // Currently, Polaris supports Hadoop federation only via IMPLICIT 
authentication.
+    // Hence, prior to initializing the configuration, ensure that the catalog 
uses
+    // IMPLICIT authentication.
+    AuthenticationParametersDpo authenticationParametersDpo =
+        connectionConfigInfoDpo.getAuthenticationParameters();
+    if (authenticationParametersDpo.getAuthenticationTypeCode()
+        != AuthenticationType.IMPLICIT.getCode()) {
+      throw new IllegalStateException("Hadoop federation only supports 
IMPLICIT authentication.");
+    }
+    Configuration conf = new Configuration();
+    String warehouse = ((HadoopConnectionConfigInfoDpo) 
connectionConfigInfoDpo).getWarehouse();
+    HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse);
+    hadoopCatalog.initialize(
+        warehouse, 
connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
+    return hadoopCatalog;
+  }
+}
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 39ab22741..1b74232b5 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -42,6 +42,7 @@ polaris-minio-testcontainer=tools/minio-testcontainer
 polaris-version=tools/version
 polaris-misc-types=tools/misc-types
 polaris-persistence-varint=nosql/persistence/varint
+polaris-extensions-federation-hadoop=extensions/federation/hadoop
 
 polaris-config-docs-annotations=tools/config-docs/annotations
 polaris-config-docs-generator=tools/config-docs/generator
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
new file mode 100644
index 000000000..59c890375
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.catalog;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.secrets.UserSecretsManager;
+
+/**
+ * Factory interface for creating external catalog handles based on connection 
configuration.
+ *
+ * <p>Implementations should be annotated with CDI annotations and use the 
@Identifier annotation to
+ * specify which connection type they support.
+ */
+public interface ExternalCatalogFactory {
+
+  /**
+   * Creates a catalog handle for the given connection configuration.
+   *
+   * @param connectionConfig the connection configuration
+   * @param userSecretsManager the user secrets manager for handling 
credentials
+   * @return the initialized catalog
+   * @throws IllegalStateException if the connection configuration is invalid
+   */
+  Catalog createCatalog(
+      ConnectionConfigInfoDpo connectionConfig, UserSecretsManager 
userSecretsManager);
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
index 441c0c4c5..7c5092c43 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
@@ -35,6 +35,9 @@ public enum ConnectionType {
   HADOOP(2),
   ;
 
+  public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "iceberg_rest";
+  public static final String HADOOP_FACTORY_IDENTIFIER = "hadoop";
+
   private static final ConnectionType[] REVERSE_MAPPING_ARRAY;
 
   static {
@@ -77,4 +80,22 @@ public enum ConnectionType {
   public int getCode() {
     return this.code;
   }
+
+  /**
+   * Get the factory identifier string used for CDI injection of the 
appropriate
+   * ExternalCatalogFactory.
+   *
+   * @return the factory identifier string
+   */
+  public String getFactoryIdentifier() {
+    switch (this) {
+      case ICEBERG_REST:
+        return ICEBERG_REST_FACTORY_IDENTIFIER;
+      case HADOOP:
+        return HADOOP_FACTORY_IDENTIFIER;
+      default:
+        throw new UnsupportedOperationException(
+            "No factory identifier for connection type: " + this);
+    }
+  }
 }
diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts
index 2ebd15381..c645e0bc0 100644
--- a/runtime/server/build.gradle.kts
+++ b/runtime/server/build.gradle.kts
@@ -48,6 +48,7 @@ dependencies {
   runtimeOnly("org.postgresql:postgresql")
   runtimeOnly(project(":polaris-relational-jdbc"))
   runtimeOnly("io.quarkus:quarkus-jdbc-postgresql")
+  runtimeOnly(project(":polaris-extensions-federation-hadoop"))
 
   // enforce the Quarkus _platform_ here, to get a consistent and validated 
set of dependencies
   implementation(enforcedPlatform(libs.quarkus.bom))
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 5023f99f7..13767c794 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import jakarta.enterprise.context.RequestScoped;
+import jakarta.enterprise.inject.Any;
+import jakarta.enterprise.inject.Instance;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.WebApplicationException;
 import jakarta.ws.rs.core.HttpHeaders;
@@ -61,6 +63,7 @@ import 
org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.context.RealmContext;
@@ -146,6 +149,7 @@ public class IcebergCatalogAdapter
   private final CatalogPrefixParser prefixParser;
   private final ReservedProperties reservedProperties;
   private final CatalogHandlerUtils catalogHandlerUtils;
+  private final Instance<ExternalCatalogFactory> externalCatalogFactories;
 
   @Inject
   public IcebergCatalogAdapter(
@@ -159,7 +163,8 @@ public class IcebergCatalogAdapter
       PolarisAuthorizer polarisAuthorizer,
       CatalogPrefixParser prefixParser,
       ReservedProperties reservedProperties,
-      CatalogHandlerUtils catalogHandlerUtils) {
+      CatalogHandlerUtils catalogHandlerUtils,
+      @Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
     this.realmContext = realmContext;
     this.callContext = callContext;
     this.realmConfig = callContext.getRealmConfig();
@@ -172,6 +177,7 @@ public class IcebergCatalogAdapter
     this.prefixParser = prefixParser;
     this.reservedProperties = reservedProperties;
     this.catalogHandlerUtils = catalogHandlerUtils;
+    this.externalCatalogFactories = externalCatalogFactories;
   }
 
   /**
@@ -208,7 +214,8 @@ public class IcebergCatalogAdapter
         catalogName,
         polarisAuthorizer,
         reservedProperties,
-        catalogHandlerUtils);
+        catalogHandlerUtils,
+        externalCatalogFactories);
   }
 
   @Override
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 4395467d1..266ac11e4 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -20,7 +20,9 @@ package org.apache.polaris.service.catalog.iceberg;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import io.smallrye.common.annotation.Identifier;
 import jakarta.annotation.Nonnull;
+import jakarta.enterprise.inject.Instance;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.time.OffsetDateTime;
@@ -33,7 +35,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.MetadataUpdate;
@@ -46,7 +47,6 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.UpdateRequirement;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
@@ -55,9 +55,6 @@ import org.apache.iceberg.exceptions.BadRequestException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.rest.HTTPClient;
-import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.credentials.ImmutableCredential;
 import org.apache.iceberg.rest.requests.CommitTransactionRequest;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
@@ -76,13 +73,10 @@ import org.apache.iceberg.rest.responses.LoadViewResponse;
 import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
 import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
 import org.apache.polaris.core.config.FeatureConfiguration;
-import org.apache.polaris.core.connection.AuthenticationParametersDpo;
-import org.apache.polaris.core.connection.AuthenticationType;
 import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
 import org.apache.polaris.core.connection.ConnectionType;
-import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
-import 
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -132,6 +126,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   private final ReservedProperties reservedProperties;
   private final CatalogHandlerUtils catalogHandlerUtils;
 
+  private final Instance<ExternalCatalogFactory> externalCatalogFactories;
+
   // Catalog instance will be initialized after authorizing resolver 
successfully resolves
   // the catalog entity.
   protected Catalog baseCatalog = null;
@@ -151,13 +147,15 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       String catalogName,
       PolarisAuthorizer authorizer,
       ReservedProperties reservedProperties,
-      CatalogHandlerUtils catalogHandlerUtils) {
+      CatalogHandlerUtils catalogHandlerUtils,
+      Instance<ExternalCatalogFactory> externalCatalogFactories) {
     super(callContext, resolutionManifestFactory, securityContext, 
catalogName, authorizer);
     this.metaStoreManager = metaStoreManager;
     this.userSecretsManager = userSecretsManager;
     this.catalogFactory = catalogFactory;
     this.reservedProperties = reservedProperties;
     this.catalogHandlerUtils = catalogHandlerUtils;
+    this.externalCatalogFactories = externalCatalogFactories;
   }
 
   /**
@@ -220,42 +218,18 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       ConnectionType connectionType =
           
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
 
-      switch (connectionType) {
-        case ICEBERG_REST:
-          SessionCatalog.SessionContext context = 
SessionCatalog.SessionContext.createEmpty();
-          federatedCatalog =
-              new RESTCatalog(
-                  context,
-                  (config) ->
-                      HTTPClient.builder(config)
-                          
.uri(config.get(org.apache.iceberg.CatalogProperties.URI))
-                          .build());
-          federatedCatalog.initialize(
-              ((IcebergRestConnectionConfigInfoDpo) 
connectionConfigInfoDpo).getRemoteCatalogName(),
-              
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
-          break;
-        case HADOOP:
-          // Currently, Polaris supports Hadoop federation only via IMPLICIT 
authentication.
-          // Hence, prior to initializing the configuration, ensure that the 
catalog uses
-          // IMPLICIT authentication.
-          AuthenticationParametersDpo authenticationParametersDpo =
-              connectionConfigInfoDpo.getAuthenticationParameters();
-          if (authenticationParametersDpo.getAuthenticationTypeCode()
-              != AuthenticationType.IMPLICIT.getCode()) {
-            throw new IllegalStateException(
-                "Hadoop federation only supports IMPLICIT authentication.");
-          }
-          Configuration conf = new Configuration();
-          String warehouse =
-              ((HadoopConnectionConfigInfoDpo) 
connectionConfigInfoDpo).getWarehouse();
-          federatedCatalog = new HadoopCatalog(conf, warehouse);
-          federatedCatalog.initialize(
-              warehouse,
-              
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Connection type not supported: " + connectionType);
+      // Use the unified factory pattern for all external catalog types
+      Instance<ExternalCatalogFactory> externalCatalogFactory =
+          externalCatalogFactories.select(
+              Identifier.Literal.of(connectionType.getFactoryIdentifier()));
+      if (externalCatalogFactory.isResolvable()) {
+        federatedCatalog =
+            externalCatalogFactory
+                .get()
+                .createCatalog(connectionConfigInfoDpo, 
getUserSecretsManager());
+      } else {
+        throw new UnsupportedOperationException(
+            "External catalog factory for type '" + connectionType + "' is 
unavailable.");
       }
       this.baseCatalog = federatedCatalog;
     } else {
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
new file mode 100644
index 000000000..05de201c3
--- /dev/null
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.iceberg;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
+import 
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.secrets.UserSecretsManager;
+
+/** Factory class for creating an Iceberg REST catalog handle based on 
connection configuration. */
+@ApplicationScoped
+@Identifier(ConnectionType.ICEBERG_REST_FACTORY_IDENTIFIER)
+public class IcebergRESTExternalCatalogFactory implements 
ExternalCatalogFactory {
+
+  @Override
+  public Catalog createCatalog(
+      ConnectionConfigInfoDpo connectionConfig, UserSecretsManager 
userSecretsManager) {
+    if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo)) {
+      throw new IllegalArgumentException(
+          "Expected IcebergRestConnectionConfigInfoDpo but got: "
+              + connectionConfig.getClass().getSimpleName());
+    }
+
+    IcebergRestConnectionConfigInfoDpo icebergConfig =
+        (IcebergRestConnectionConfigInfoDpo) connectionConfig;
+
+    SessionCatalog.SessionContext context = 
SessionCatalog.SessionContext.createEmpty();
+    RESTCatalog federatedCatalog =
+        new RESTCatalog(
+            context,
+            (config) ->
+                HTTPClient.builder(config)
+                    .uri(config.get(org.apache.iceberg.CatalogProperties.URI))
+                    .build());
+
+    federatedCatalog.initialize(
+        icebergConfig.getRemoteCatalogName(),
+        connectionConfig.asIcebergCatalogProperties(userSecretsManager));
+
+    return federatedCatalog;
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
index ea963784b..90d7341a9 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -21,6 +21,7 @@ package org.apache.polaris.service.catalog;
 import com.google.common.collect.ImmutableMap;
 import io.quarkus.test.junit.QuarkusTest;
 import io.quarkus.test.junit.TestProfile;
+import jakarta.enterprise.inject.Instance;
 import jakarta.ws.rs.core.SecurityContext;
 import java.time.Instant;
 import java.util.List;
@@ -55,6 +56,7 @@ import 
org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.CatalogRoleEntity;
@@ -79,6 +81,14 @@ import org.mockito.Mockito;
 @TestProfile(PolarisAuthzTestBase.Profile.class)
 public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase {
 
+  @SuppressWarnings("unchecked")
+  private static Instance<ExternalCatalogFactory> 
emptyExternalCatalogFactory() {
+    Instance<ExternalCatalogFactory> mock = Mockito.mock(Instance.class);
+    Mockito.when(mock.select(Mockito.any())).thenReturn(mock);
+    Mockito.when(mock.isUnsatisfied()).thenReturn(true);
+    return mock;
+  }
+
   private IcebergCatalogHandler newWrapper() {
     return newWrapper(Set.of());
   }
@@ -101,7 +111,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         catalogName,
         polarisAuthorizer,
         reservedProperties,
-        catalogHandlerUtils);
+        catalogHandlerUtils,
+        emptyExternalCatalogFactory());
   }
 
   /**
@@ -242,7 +253,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             CATALOG_NAME,
             polarisAuthorizer,
             reservedProperties,
-            catalogHandlerUtils);
+            catalogHandlerUtils,
+            emptyExternalCatalogFactory());
 
     // a variety of actions are all disallowed because the principal's 
credentials must be rotated
     doTestInsufficientPrivileges(
@@ -277,7 +289,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             CATALOG_NAME,
             polarisAuthorizer,
             reservedProperties,
-            catalogHandlerUtils);
+            catalogHandlerUtils,
+            emptyExternalCatalogFactory());
 
     doTestSufficientPrivilegeSets(
         List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)),
diff --git 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index d8ec77788..e7288c441 100644
--- 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -22,6 +22,7 @@ import com.google.auth.oauth2.AccessToken;
 import com.google.auth.oauth2.GoogleCredentials;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.enterprise.inject.Instance;
 import jakarta.ws.rs.core.SecurityContext;
 import java.security.Principal;
 import java.time.Clock;
@@ -36,6 +37,7 @@ import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.catalog.ExternalCatalogFactory;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.context.RealmContext;
@@ -215,6 +217,11 @@ public record TestServices(
       CatalogHandlerUtils catalogHandlerUtils =
           new CatalogHandlerUtils(callContext.getRealmConfig());
 
+      @SuppressWarnings("unchecked")
+      Instance<ExternalCatalogFactory> externalCatalogFactory = 
Mockito.mock(Instance.class);
+      
Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory);
+      Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true);
+
       IcebergCatalogAdapter catalogService =
           new IcebergCatalogAdapter(
               realmContext,
@@ -227,7 +234,8 @@ public record TestServices(
               authorizer,
               new DefaultCatalogPrefixParser(),
               reservedProperties,
-              catalogHandlerUtils);
+              catalogHandlerUtils,
+              externalCatalogFactory);
 
       IcebergRestCatalogApi restApi = new 
IcebergRestCatalogApi(catalogService);
       IcebergRestConfigurationApi restConfigurationApi =

Reply via email to