This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 316b9ae1 Revert "Remove CallContextCatalogFactory (#592)" (#943)
316b9ae1 is described below

commit 316b9ae1f60254d5631e36433151232640cadbfb
Author: Eric Maynard <[email protected]>
AuthorDate: Tue Feb 4 13:46:28 2025 -0800

    Revert "Remove CallContextCatalogFactory (#592)" (#943)
    
    * Revert "Remove CallContextCatalogFactory (#592)"
    
    This reverts commit 8a46a451d6207d524cc423e815a119ec320635be.
    
    * fix conflicts
    
    * spotless
---
 .../quarkus/admin/PolarisAuthzTestBase.java        |  62 ++++++++++-
 .../PolarisCatalogHandlerWrapperAuthzTest.java     | 121 +++++++++++++-------
 .../service/catalog/IcebergCatalogAdapter.java     |  19 ++--
 .../catalog/PolarisCatalogHandlerWrapper.java      |  65 ++---------
 .../service/context/CallContextCatalogFactory.java |  33 ++++++
 .../context/PolarisCallContextCatalogFactory.java  | 123 +++++++++++++++++++++
 .../org/apache/polaris/service/TestServices.java   |  18 ++-
 7 files changed, 328 insertions(+), 113 deletions(-)

diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 730572af..c51081fc 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap;
 import io.quarkus.test.junit.QuarkusMock;
 import io.quarkus.test.junit.QuarkusTestProfile;
 import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.enterprise.inject.Alternative;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.IOException;
@@ -37,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ForbiddenException;
@@ -65,6 +68,7 @@ import 
org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.apache.polaris.core.persistence.PolarisEntityManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.BasePolarisCatalog;
 import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
@@ -72,7 +76,9 @@ import 
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.apache.polaris.service.task.TaskExecutor;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -85,6 +91,11 @@ public abstract class PolarisAuthzTestBase {
 
   public static class Profile implements QuarkusTestProfile {
 
+    @Override
+    public Set<Class<?>> getEnabledAlternatives() {
+      return Set.of(TestPolarisCallContextCatalogFactory.class);
+    }
+
     @Override
     public Map<String, String> getConfigOverrides() {
       return Map.of(
@@ -221,8 +232,6 @@ public abstract class PolarisAuthzTestBase {
                 .setName(CATALOG_NAME)
                 .setCatalogType("INTERNAL")
                 .setDefaultBaseLocation(storageLocation)
-                .addProperty(
-                    CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO")
                 .setStorageConfigurationInfo(storageConfigModel, 
storageLocation)
                 .build());
 
@@ -323,7 +332,7 @@ public abstract class PolarisAuthzTestBase {
     Mockito.when(securityContext.getUserPrincipal()).thenReturn(p);
     Set<String> principalRoleNames = loadPrincipalRolesNames(p);
     Mockito.when(securityContext.isUserInRole(Mockito.anyString()))
-        .thenAnswer(invocation -> principalRoleNames.contains((String) 
invocation.getArgument(0)));
+        .thenAnswer(invocation -> 
principalRoleNames.contains(invocation.getArgument(0)));
     return securityContext;
   }
 
@@ -407,6 +416,53 @@ public abstract class PolarisAuthzTestBase {
             CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
   }
 
+  @Alternative
+  @RequestScoped
+  public static class TestPolarisCallContextCatalogFactory
+      extends PolarisCallContextCatalogFactory {
+
+    public TestPolarisCallContextCatalogFactory() {
+      super(null, null, null, null, null, null, null);
+    }
+
+    @Inject
+    public TestPolarisCallContextCatalogFactory(
+        PolarisEntityManager entityManager,
+        PolarisMetaStoreManager metaStoreManager,
+        PolarisMetaStoreSession metaStoreSession,
+        PolarisConfigurationStore configurationStore,
+        PolarisDiagnostics diagnostics,
+        TaskExecutor taskExecutor,
+        FileIOFactory fileIOFactory) {
+      super(
+          entityManager,
+          metaStoreManager,
+          metaStoreSession,
+          configurationStore,
+          diagnostics,
+          taskExecutor,
+          fileIOFactory);
+    }
+
+    @Override
+    public Catalog createCallContextCatalog(
+        RealmContext realmContext,
+        AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+        SecurityContext securityContext,
+        final PolarisResolutionManifest resolvedManifest) {
+      // This depends on the BasePolarisCatalog allowing calling initialize 
multiple times
+      // to override the previous config.
+      Catalog catalog =
+          super.createCallContextCatalog(
+              realmContext, authenticatedPolarisPrincipal, securityContext, 
resolvedManifest);
+      catalog.initialize(
+          CATALOG_NAME,
+          ImmutableMap.of(
+              CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+      return catalog;
+    }
+  }
+
   /**
    * Tests each "sufficient" privilege individually by invoking {@code 
grantAction} for each set of
    * privileges, running the action being tested, revoking after each test 
set, and also ensuring
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
index 507ac1af..6ba5cef1 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.polaris.service.quarkus.catalog;
 
+import com.google.common.collect.ImmutableMap;
 import io.quarkus.test.junit.QuarkusTest;
 import io.quarkus.test.junit.TestProfile;
 import jakarta.ws.rs.core.SecurityContext;
@@ -33,6 +34,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ForbiddenException;
@@ -52,12 +54,17 @@ import 
org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.CatalogRoleEntity;
 import org.apache.polaris.core.entity.PolarisPrivilege;
 import org.apache.polaris.core.entity.PrincipalEntity;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import org.apache.polaris.service.catalog.PolarisCatalogHandlerWrapper;
+import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.apache.polaris.service.types.NotificationType;
@@ -87,24 +94,27 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends 
PolarisAuthzTestBase
   }
 
   private PolarisCatalogHandlerWrapper newWrapper(Set<String> 
activatedPrincipalRoles) {
-    return newWrapper(activatedPrincipalRoles, CATALOG_NAME);
+    return newWrapper(activatedPrincipalRoles, CATALOG_NAME, 
newCatalogFactory());
   }
 
   private PolarisCatalogHandlerWrapper newWrapper(
-      Set<String> activatedPrincipalRoles, String catalogName) {
+      Set<String> activatedPrincipalRoles, String catalogName, 
CallContextCatalogFactory factory) {
     final AuthenticatedPolarisPrincipal authenticatedPrincipal =
         new AuthenticatedPolarisPrincipal(principalEntity, 
activatedPrincipalRoles);
-    SecurityContext securityContext =
-        securityContext(authenticatedPrincipal, activatedPrincipalRoles);
-    return newWrapper(securityContext, catalogName);
+    return new PolarisCatalogHandlerWrapper(
+        realmContext,
+        metaStoreSession,
+        configurationStore,
+        diagServices,
+        entityManager,
+        metaStoreManager,
+        securityContext(authenticatedPrincipal, activatedPrincipalRoles),
+        factory,
+        catalogName,
+        polarisAuthorizer);
   }
 
   private PolarisCatalogHandlerWrapper newWrapper(SecurityContext 
securityContext) {
-    return newWrapper(securityContext, CATALOG_NAME);
-  }
-
-  private PolarisCatalogHandlerWrapper newWrapper(
-      SecurityContext securityContext, String catalogName) {
     return new PolarisCatalogHandlerWrapper(
         realmContext,
         metaStoreSession,
@@ -113,8 +123,18 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends 
PolarisAuthzTestBase
         entityManager,
         metaStoreManager,
         securityContext,
-        catalogName,
-        polarisAuthorizer,
+        newCatalogFactory(),
+        CATALOG_NAME,
+        polarisAuthorizer);
+  }
+
+  private CallContextCatalogFactory newCatalogFactory() {
+    return new TestPolarisCallContextCatalogFactory(
+        entityManager,
+        metaStoreManager,
+        metaStoreSession,
+        configurationStore,
+        diagServices,
         Mockito.mock(),
         fileIOFactory);
   }
@@ -1621,8 +1641,6 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
             .setName(externalCatalog)
             .setDefaultBaseLocation(storageLocation)
             .setStorageConfigurationInfo(storageConfigModel, storageLocation)
-            .addProperty(
-                CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO")
             .setCatalogType("EXTERNAL")
             .build());
     adminService.createCatalogRole(
@@ -1685,23 +1703,49 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
     validatePayload.setTimestamp(530950845L);
     validateRequest.setPayload(validatePayload);
 
-    try (FileIO fileIO =
-        CatalogUtil.loadFileIO(
-            "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of(), new 
Configuration())) {
-      TableMetadata tableMetadata =
-          TableMetadata.buildFromEmpty()
-              .addSchema(SCHEMA, SCHEMA.highestFieldId())
-              .setLocation(
-                  String.format("%s/bucket/table/metadata/v1.metadata.json", 
storageLocation))
-              .addPartitionSpec(PartitionSpec.unpartitioned())
-              .addSortOrder(SortOrder.unsorted())
-              .assignUUID()
-              .build();
-      TableMetadataParser.overwrite(
-          tableMetadata, 
fileIO.newOutputFile(createPayload.getMetadataLocation()));
-      TableMetadataParser.overwrite(
-          tableMetadata, 
fileIO.newOutputFile(updatePayload.getMetadataLocation()));
-    }
+    PolarisCallContextCatalogFactory factory =
+        new PolarisCallContextCatalogFactory(
+            entityManager,
+            metaStoreManager,
+            metaStoreSession,
+            configurationStore,
+            diagServices,
+            Mockito.mock(),
+            new DefaultFileIOFactory(
+                realmEntityManagerFactory, managerFactory, 
configurationStore)) {
+          @Override
+          public Catalog createCallContextCatalog(
+              RealmContext realmContext,
+              AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+              SecurityContext securityContext,
+              PolarisResolutionManifest resolvedManifest) {
+            Catalog catalog =
+                super.createCallContextCatalog(
+                    realmContext, authenticatedPolarisPrincipal, 
securityContext, resolvedManifest);
+            String fileIoImpl = "org.apache.iceberg.inmemory.InMemoryFileIO";
+            catalog.initialize(
+                externalCatalog, 
ImmutableMap.of(CatalogProperties.FILE_IO_IMPL, fileIoImpl));
+
+            try (FileIO fileIO =
+                CatalogUtil.loadFileIO(fileIoImpl, Map.of(), new 
Configuration())) {
+              TableMetadata tableMetadata =
+                  TableMetadata.buildFromEmpty()
+                      .addSchema(SCHEMA, SCHEMA.highestFieldId())
+                      .setLocation(
+                          String.format(
+                              "%s/bucket/table/metadata/v1.metadata.json", 
storageLocation))
+                      .addPartitionSpec(PartitionSpec.unpartitioned())
+                      .addSortOrder(SortOrder.unsorted())
+                      .assignUUID()
+                      .build();
+              TableMetadataParser.overwrite(
+                  tableMetadata, 
fileIO.newOutputFile(createPayload.getMetadataLocation()));
+              TableMetadataParser.overwrite(
+                  tableMetadata, 
fileIO.newOutputFile(updatePayload.getMetadataLocation()));
+            }
+            return catalog;
+          }
+        };
 
     List<Set<PolarisPrivilege>> sufficientPrivilegeSets =
         List.of(
@@ -1725,18 +1769,19 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
     doTestSufficientPrivilegeSets(
         sufficientPrivilegeSets,
         () -> {
-          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
               .sendNotification(table, createRequest);
-          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
               .sendNotification(table, updateRequest);
-          newWrapper(Set.of(PRINCIPAL_ROLE1), 
externalCatalog).sendNotification(table, dropRequest);
-          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
+              .sendNotification(table, dropRequest);
+          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
               .sendNotification(table, validateRequest);
         },
         () -> {
-          newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog, factory)
               .dropNamespace(Namespace.of("extns1", "extns2"));
-          newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog, factory)
               .dropNamespace(Namespace.of("extns1"));
         },
         PRINCIPAL_NAME,
@@ -1746,7 +1791,7 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
     doTestSufficientPrivilegeSets(
         sufficientPrivilegeSets,
         () -> {
-          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+          newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
               .sendNotification(table, validateRequest);
         },
         null /* cleanupAction */,
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
index a46b2af0..4998857e 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
@@ -68,8 +68,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver;
 import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
 import 
org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.task.TaskExecutor;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
 import org.apache.polaris.service.types.CommitTableRequest;
 import org.apache.polaris.service.types.CommitViewRequest;
 import org.apache.polaris.service.types.NotificationRequest;
@@ -120,35 +119,32 @@ public class IcebergCatalogAdapter
           .build();
 
   private final RealmContext realmContext;
+  private final CallContextCatalogFactory catalogFactory;
   private final PolarisMetaStoreManager metaStoreManager;
   private final PolarisEntityManager entityManager;
   private final PolarisMetaStoreSession session;
   private final PolarisConfigurationStore configurationStore;
   private final PolarisDiagnostics diagnostics;
   private final PolarisAuthorizer polarisAuthorizer;
-  private final TaskExecutor taskExecutor;
-  private final FileIOFactory fileIOFactory;
 
   @Inject
   public IcebergCatalogAdapter(
       RealmContext realmContext,
+      CallContextCatalogFactory catalogFactory,
       PolarisEntityManager entityManager,
       PolarisMetaStoreManager metaStoreManager,
       PolarisMetaStoreSession session,
       PolarisConfigurationStore configurationStore,
       PolarisDiagnostics diagnostics,
-      PolarisAuthorizer polarisAuthorizer,
-      TaskExecutor taskExecutor,
-      FileIOFactory fileIOFactory) {
+      PolarisAuthorizer polarisAuthorizer) {
     this.realmContext = realmContext;
+    this.catalogFactory = catalogFactory;
     this.entityManager = entityManager;
     this.metaStoreManager = metaStoreManager;
     this.session = session;
     this.configurationStore = configurationStore;
     this.diagnostics = diagnostics;
     this.polarisAuthorizer = polarisAuthorizer;
-    this.taskExecutor = taskExecutor;
-    this.fileIOFactory = fileIOFactory;
   }
 
   /**
@@ -186,10 +182,9 @@ public class IcebergCatalogAdapter
         entityManager,
         metaStoreManager,
         securityContext,
+        catalogFactory,
         catalogName,
-        polarisAuthorizer,
-        taskExecutor,
-        fileIOFactory);
+        polarisAuthorizer);
   }
 
   @Override
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
index 26befced..95c8c485 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
@@ -20,26 +20,21 @@ package org.apache.polaris.service.catalog;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import jakarta.annotation.Nonnull;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.Paths;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SortOrder;
@@ -84,7 +79,6 @@ import org.apache.polaris.core.auth.PolarisAuthorizer;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.CatalogEntity;
-import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.PolarisEntityManager;
@@ -96,8 +90,7 @@ import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
 import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.core.storage.PolarisStorageActions;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.task.TaskExecutor;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,8 +123,7 @@ public class PolarisCatalogHandlerWrapper implements 
AutoCloseable {
   private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
   private final SecurityContext securityContext;
   private final PolarisAuthorizer authorizer;
-  private final TaskExecutor taskExecutor;
-  private final FileIOFactory fileIOFactory;
+  private final CallContextCatalogFactory catalogFactory;
 
   // Initialized in the authorize methods.
   private PolarisResolutionManifest resolutionManifest = null;
@@ -150,10 +142,9 @@ public class PolarisCatalogHandlerWrapper implements 
AutoCloseable {
       PolarisEntityManager entityManager,
       PolarisMetaStoreManager metaStoreManager,
       SecurityContext securityContext,
+      CallContextCatalogFactory catalogFactory,
       String catalogName,
-      PolarisAuthorizer authorizer,
-      TaskExecutor taskExecutor,
-      FileIOFactory fileIOFactory) {
+      PolarisAuthorizer authorizer) {
     this.realmContext = realmContext;
     this.session = session;
     this.entityManager = entityManager;
@@ -171,8 +162,7 @@ public class PolarisCatalogHandlerWrapper implements 
AutoCloseable {
     this.authenticatedPrincipal =
         (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal();
     this.authorizer = authorizer;
-    this.taskExecutor = taskExecutor;
-    this.fileIOFactory = fileIOFactory;
+    this.catalogFactory = catalogFactory;
   }
 
   /**
@@ -204,53 +194,14 @@ public class PolarisCatalogHandlerWrapper implements 
AutoCloseable {
   }
 
   private void initializeCatalog() {
-    PolarisBaseEntity baseCatalogEntity =
-        
resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity();
-    CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity);
-
-    String realm = realmContext.getRealmIdentifier();
-    String catalogKey = realm + "/" + catalogName;
-    LOGGER.info("Initializing new BasePolarisCatalog for key: {}", catalogKey);
-
-    Map<String, String> catalogProperties = new 
HashMap<>(catalog.getPropertiesAsMap());
-    String defaultBaseLocation = catalog.getDefaultBaseLocation();
-    LOGGER.info("Looked up defaultBaseLocation {} for catalog {}", 
defaultBaseLocation, catalogKey);
-    catalogProperties.put(
-        CatalogProperties.WAREHOUSE_LOCATION,
-        Objects.requireNonNullElseGet(
-            defaultBaseLocation,
-            () -> Paths.get(WAREHOUSE_LOCATION_BASEDIR, 
catalogKey).toString()));
-
-    this.baseCatalog = createBasePolarisCatalog(catalogProperties);
+    this.baseCatalog =
+        catalogFactory.createCallContextCatalog(
+            realmContext, authenticatedPrincipal, securityContext, 
resolutionManifest);
     this.namespaceCatalog =
         (baseCatalog instanceof SupportsNamespaces) ? (SupportsNamespaces) 
baseCatalog : null;
     this.viewCatalog = (baseCatalog instanceof ViewCatalog) ? (ViewCatalog) 
baseCatalog : null;
   }
 
-  private static final String WAREHOUSE_LOCATION_BASEDIR =
-      "/tmp/iceberg_rest_server_warehouse_data/";
-
-  @Nonnull
-  protected Catalog createBasePolarisCatalog(Map<String, String> 
catalogProperties) {
-
-    BasePolarisCatalog catalogInstance =
-        new BasePolarisCatalog(
-            realmContext,
-            entityManager,
-            metaStoreManager,
-            session,
-            configurationStore,
-            diagnostics,
-            resolutionManifest,
-            securityContext,
-            taskExecutor,
-            fileIOFactory);
-
-    // TODO: The initialize properties might need to take more from the 
CatalogEntity.
-    catalogInstance.initialize(catalogName, catalogProperties);
-    return catalogInstance;
-  }
-
   private void authorizeBasicNamespaceOperationOrThrow(
       PolarisAuthorizableOperation op, Namespace namespace) {
     authorizeBasicNamespaceOperationOrThrow(op, namespace, null, null);
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
new file mode 100644
index 00000000..b300aa32
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.context;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
+
+public interface CallContextCatalogFactory {
+  Catalog createCallContextCatalog(
+      RealmContext realmContext,
+      AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
+      PolarisResolutionManifest resolvedManifest);
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
new file mode 100644
index 00000000..85773a3b
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.context;
+
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.SecurityContext;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.task.TaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RequestScoped
+public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFactory {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PolarisCallContextCatalogFactory.class);
+
+  private static final String WAREHOUSE_LOCATION_BASEDIR =
+      "/tmp/iceberg_rest_server_warehouse_data/";
+
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+  private final PolarisDiagnostics diagnostics;
+  private final TaskExecutor taskExecutor;
+  private final FileIOFactory fileIOFactory;
+
+  @Inject
+  public PolarisCallContextCatalogFactory(
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore configurationStore,
+      PolarisDiagnostics diagnostics,
+      TaskExecutor taskExecutor,
+      FileIOFactory fileIOFactory) {
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = configurationStore;
+    this.diagnostics = diagnostics;
+    this.taskExecutor = taskExecutor;
+    this.fileIOFactory = fileIOFactory;
+  }
+
+  @Override
+  public Catalog createCallContextCatalog(
+      RealmContext realmContext,
+      AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
+      final PolarisResolutionManifest resolvedManifest) {
+    PolarisBaseEntity baseCatalogEntity =
+        
resolvedManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity();
+    String catalogName = baseCatalogEntity.getName();
+
+    String realm = realmContext.getRealmIdentifier();
+    String catalogKey = realm + "/" + catalogName;
+    LOGGER.info("Initializing new BasePolarisCatalog for key: {}", catalogKey);
+
+    BasePolarisCatalog catalogInstance =
+        new BasePolarisCatalog(
+            realmContext,
+            entityManager,
+            metaStoreManager,
+            metaStoreSession,
+            configurationStore,
+            diagnostics,
+            resolvedManifest,
+            securityContext,
+            taskExecutor,
+            fileIOFactory);
+
+    CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity);
+    Map<String, String> catalogProperties = new 
HashMap<>(catalog.getPropertiesAsMap());
+    String defaultBaseLocation = catalog.getDefaultBaseLocation();
+    LOGGER.info("Looked up defaultBaseLocation {} for catalog {}", 
defaultBaseLocation, catalogKey);
+    catalogProperties.put(
+        CatalogProperties.WAREHOUSE_LOCATION,
+        Objects.requireNonNullElseGet(
+            defaultBaseLocation,
+            () -> Paths.get(WAREHOUSE_LOCATION_BASEDIR, 
catalogKey).toString()));
+
+    // TODO: The initialize properties might need to take more from 
CallContext and the
+    // CatalogEntity.
+    catalogInstance.initialize(catalogName, catalogProperties);
+
+    return catalogInstance;
+  }
+}
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 8dd43bf3..a66f55b9 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -47,6 +47,8 @@ import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
@@ -141,17 +143,27 @@ public record TestServices(
               realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore);
 
       TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
+
+      CallContextCatalogFactory callContextFactory =
+          new PolarisCallContextCatalogFactory(
+              entityManager,
+              metaStoreManager,
+              metaStoreSession,
+              configurationStore,
+              polarisDiagnostics,
+              Mockito.mock(TaskExecutor.class),
+              fileIOFactory);
+
       IcebergRestCatalogApiService service =
           new IcebergCatalogAdapter(
               realm,
+              callContextFactory,
               entityManager,
               metaStoreManager,
               metaStoreSession,
               configurationStore,
               polarisDiagnostics,
-              authorizer,
-              taskExecutor,
-              fileIOFactory);
+              authorizer);
 
       IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
 


Reply via email to