This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 316b9ae1 Revert "Remove CallContextCatalogFactory (#592)" (#943)
316b9ae1 is described below
commit 316b9ae1f60254d5631e36433151232640cadbfb
Author: Eric Maynard <[email protected]>
AuthorDate: Tue Feb 4 13:46:28 2025 -0800
Revert "Remove CallContextCatalogFactory (#592)" (#943)
* Revert "Remove CallContextCatalogFactory (#592)"
This reverts commit 8a46a451d6207d524cc423e815a119ec320635be.
* fix conflicts
* spotless
---
.../quarkus/admin/PolarisAuthzTestBase.java | 62 ++++++++++-
.../PolarisCatalogHandlerWrapperAuthzTest.java | 121 +++++++++++++-------
.../service/catalog/IcebergCatalogAdapter.java | 19 ++--
.../catalog/PolarisCatalogHandlerWrapper.java | 65 ++---------
.../service/context/CallContextCatalogFactory.java | 33 ++++++
.../context/PolarisCallContextCatalogFactory.java | 123 +++++++++++++++++++++
.../org/apache/polaris/service/TestServices.java | 18 ++-
7 files changed, 328 insertions(+), 113 deletions(-)
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 730572af..c51081fc 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap;
import io.quarkus.test.junit.QuarkusMock;
import io.quarkus.test.junit.QuarkusTestProfile;
import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.SecurityContext;
import java.io.IOException;
@@ -37,6 +39,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
@@ -65,6 +68,7 @@ import
org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
@@ -72,7 +76,9 @@ import
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.apache.polaris.service.task.TaskExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -85,6 +91,11 @@ public abstract class PolarisAuthzTestBase {
public static class Profile implements QuarkusTestProfile {
+ @Override
+ public Set<Class<?>> getEnabledAlternatives() {
+ return Set.of(TestPolarisCallContextCatalogFactory.class);
+ }
+
@Override
public Map<String, String> getConfigOverrides() {
return Map.of(
@@ -221,8 +232,6 @@ public abstract class PolarisAuthzTestBase {
.setName(CATALOG_NAME)
.setCatalogType("INTERNAL")
.setDefaultBaseLocation(storageLocation)
- .addProperty(
- CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
.setStorageConfigurationInfo(storageConfigModel,
storageLocation)
.build());
@@ -323,7 +332,7 @@ public abstract class PolarisAuthzTestBase {
Mockito.when(securityContext.getUserPrincipal()).thenReturn(p);
Set<String> principalRoleNames = loadPrincipalRolesNames(p);
Mockito.when(securityContext.isUserInRole(Mockito.anyString()))
- .thenAnswer(invocation -> principalRoleNames.contains((String)
invocation.getArgument(0)));
+ .thenAnswer(invocation ->
principalRoleNames.contains(invocation.getArgument(0)));
return securityContext;
}
@@ -407,6 +416,53 @@ public abstract class PolarisAuthzTestBase {
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
}
+ @Alternative
+ @RequestScoped
+ public static class TestPolarisCallContextCatalogFactory
+ extends PolarisCallContextCatalogFactory {
+
+ public TestPolarisCallContextCatalogFactory() {
+ super(null, null, null, null, null, null, null);
+ }
+
+ @Inject
+ public TestPolarisCallContextCatalogFactory(
+ PolarisEntityManager entityManager,
+ PolarisMetaStoreManager metaStoreManager,
+ PolarisMetaStoreSession metaStoreSession,
+ PolarisConfigurationStore configurationStore,
+ PolarisDiagnostics diagnostics,
+ TaskExecutor taskExecutor,
+ FileIOFactory fileIOFactory) {
+ super(
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ diagnostics,
+ taskExecutor,
+ fileIOFactory);
+ }
+
+ @Override
+ public Catalog createCallContextCatalog(
+ RealmContext realmContext,
+ AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+ SecurityContext securityContext,
+ final PolarisResolutionManifest resolvedManifest) {
+ // This depends on the BasePolarisCatalog allowing calling initialize
multiple times
+ // to override the previous config.
+ Catalog catalog =
+ super.createCallContextCatalog(
+ realmContext, authenticatedPolarisPrincipal, securityContext,
resolvedManifest);
+ catalog.initialize(
+ CATALOG_NAME,
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+ return catalog;
+ }
+ }
+
/**
* Tests each "sufficient" privilege individually by invoking {@code
grantAction} for each set of
* privileges, running the action being tested, revoking after each test
set, and also ensuring
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
index 507ac1af..6ba5cef1 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.polaris.service.quarkus.catalog;
+import com.google.common.collect.ImmutableMap;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.ws.rs.core.SecurityContext;
@@ -33,6 +34,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
@@ -52,12 +54,17 @@ import
org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.CatalogRoleEntity;
import org.apache.polaris.core.entity.PolarisPrivilege;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.catalog.PolarisCatalogHandlerWrapper;
+import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
@@ -87,24 +94,27 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends
PolarisAuthzTestBase
}
private PolarisCatalogHandlerWrapper newWrapper(Set<String>
activatedPrincipalRoles) {
- return newWrapper(activatedPrincipalRoles, CATALOG_NAME);
+ return newWrapper(activatedPrincipalRoles, CATALOG_NAME,
newCatalogFactory());
}
private PolarisCatalogHandlerWrapper newWrapper(
- Set<String> activatedPrincipalRoles, String catalogName) {
+ Set<String> activatedPrincipalRoles, String catalogName,
CallContextCatalogFactory factory) {
final AuthenticatedPolarisPrincipal authenticatedPrincipal =
new AuthenticatedPolarisPrincipal(principalEntity,
activatedPrincipalRoles);
- SecurityContext securityContext =
- securityContext(authenticatedPrincipal, activatedPrincipalRoles);
- return newWrapper(securityContext, catalogName);
+ return new PolarisCatalogHandlerWrapper(
+ realmContext,
+ metaStoreSession,
+ configurationStore,
+ diagServices,
+ entityManager,
+ metaStoreManager,
+ securityContext(authenticatedPrincipal, activatedPrincipalRoles),
+ factory,
+ catalogName,
+ polarisAuthorizer);
}
private PolarisCatalogHandlerWrapper newWrapper(SecurityContext
securityContext) {
- return newWrapper(securityContext, CATALOG_NAME);
- }
-
- private PolarisCatalogHandlerWrapper newWrapper(
- SecurityContext securityContext, String catalogName) {
return new PolarisCatalogHandlerWrapper(
realmContext,
metaStoreSession,
@@ -113,8 +123,18 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends
PolarisAuthzTestBase
entityManager,
metaStoreManager,
securityContext,
- catalogName,
- polarisAuthorizer,
+ newCatalogFactory(),
+ CATALOG_NAME,
+ polarisAuthorizer);
+ }
+
+ private CallContextCatalogFactory newCatalogFactory() {
+ return new TestPolarisCallContextCatalogFactory(
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ diagServices,
Mockito.mock(),
fileIOFactory);
}
@@ -1621,8 +1641,6 @@ public class PolarisCatalogHandlerWrapperAuthzTest
extends PolarisAuthzTestBase
.setName(externalCatalog)
.setDefaultBaseLocation(storageLocation)
.setStorageConfigurationInfo(storageConfigModel, storageLocation)
- .addProperty(
- CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
.setCatalogType("EXTERNAL")
.build());
adminService.createCatalogRole(
@@ -1685,23 +1703,49 @@ public class PolarisCatalogHandlerWrapperAuthzTest
extends PolarisAuthzTestBase
validatePayload.setTimestamp(530950845L);
validateRequest.setPayload(validatePayload);
- try (FileIO fileIO =
- CatalogUtil.loadFileIO(
- "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of(), new
Configuration())) {
- TableMetadata tableMetadata =
- TableMetadata.buildFromEmpty()
- .addSchema(SCHEMA, SCHEMA.highestFieldId())
- .setLocation(
- String.format("%s/bucket/table/metadata/v1.metadata.json",
storageLocation))
- .addPartitionSpec(PartitionSpec.unpartitioned())
- .addSortOrder(SortOrder.unsorted())
- .assignUUID()
- .build();
- TableMetadataParser.overwrite(
- tableMetadata,
fileIO.newOutputFile(createPayload.getMetadataLocation()));
- TableMetadataParser.overwrite(
- tableMetadata,
fileIO.newOutputFile(updatePayload.getMetadataLocation()));
- }
+ PolarisCallContextCatalogFactory factory =
+ new PolarisCallContextCatalogFactory(
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ diagServices,
+ Mockito.mock(),
+ new DefaultFileIOFactory(
+ realmEntityManagerFactory, managerFactory,
configurationStore)) {
+ @Override
+ public Catalog createCallContextCatalog(
+ RealmContext realmContext,
+ AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+ SecurityContext securityContext,
+ PolarisResolutionManifest resolvedManifest) {
+ Catalog catalog =
+ super.createCallContextCatalog(
+ realmContext, authenticatedPolarisPrincipal,
securityContext, resolvedManifest);
+ String fileIoImpl = "org.apache.iceberg.inmemory.InMemoryFileIO";
+ catalog.initialize(
+ externalCatalog,
ImmutableMap.of(CatalogProperties.FILE_IO_IMPL, fileIoImpl));
+
+ try (FileIO fileIO =
+ CatalogUtil.loadFileIO(fileIoImpl, Map.of(), new
Configuration())) {
+ TableMetadata tableMetadata =
+ TableMetadata.buildFromEmpty()
+ .addSchema(SCHEMA, SCHEMA.highestFieldId())
+ .setLocation(
+ String.format(
+ "%s/bucket/table/metadata/v1.metadata.json",
storageLocation))
+ .addPartitionSpec(PartitionSpec.unpartitioned())
+ .addSortOrder(SortOrder.unsorted())
+ .assignUUID()
+ .build();
+ TableMetadataParser.overwrite(
+ tableMetadata,
fileIO.newOutputFile(createPayload.getMetadataLocation()));
+ TableMetadataParser.overwrite(
+ tableMetadata,
fileIO.newOutputFile(updatePayload.getMetadataLocation()));
+ }
+ return catalog;
+ }
+ };
List<Set<PolarisPrivilege>> sufficientPrivilegeSets =
List.of(
@@ -1725,18 +1769,19 @@ public class PolarisCatalogHandlerWrapperAuthzTest
extends PolarisAuthzTestBase
doTestSufficientPrivilegeSets(
sufficientPrivilegeSets,
() -> {
- newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
.sendNotification(table, createRequest);
- newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
.sendNotification(table, updateRequest);
- newWrapper(Set.of(PRINCIPAL_ROLE1),
externalCatalog).sendNotification(table, dropRequest);
- newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
+ .sendNotification(table, dropRequest);
+ newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
.sendNotification(table, validateRequest);
},
() -> {
- newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog, factory)
.dropNamespace(Namespace.of("extns1", "extns2"));
- newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE2), externalCatalog, factory)
.dropNamespace(Namespace.of("extns1"));
},
PRINCIPAL_NAME,
@@ -1746,7 +1791,7 @@ public class PolarisCatalogHandlerWrapperAuthzTest
extends PolarisAuthzTestBase
doTestSufficientPrivilegeSets(
sufficientPrivilegeSets,
() -> {
- newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog)
+ newWrapper(Set.of(PRINCIPAL_ROLE1), externalCatalog, factory)
.sendNotification(table, validateRequest);
},
null /* cleanupAction */,
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
index a46b2af0..4998857e 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
@@ -68,8 +68,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
import
org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.task.TaskExecutor;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.types.CommitTableRequest;
import org.apache.polaris.service.types.CommitViewRequest;
import org.apache.polaris.service.types.NotificationRequest;
@@ -120,35 +119,32 @@ public class IcebergCatalogAdapter
.build();
private final RealmContext realmContext;
+ private final CallContextCatalogFactory catalogFactory;
private final PolarisMetaStoreManager metaStoreManager;
private final PolarisEntityManager entityManager;
private final PolarisMetaStoreSession session;
private final PolarisConfigurationStore configurationStore;
private final PolarisDiagnostics diagnostics;
private final PolarisAuthorizer polarisAuthorizer;
- private final TaskExecutor taskExecutor;
- private final FileIOFactory fileIOFactory;
@Inject
public IcebergCatalogAdapter(
RealmContext realmContext,
+ CallContextCatalogFactory catalogFactory,
PolarisEntityManager entityManager,
PolarisMetaStoreManager metaStoreManager,
PolarisMetaStoreSession session,
PolarisConfigurationStore configurationStore,
PolarisDiagnostics diagnostics,
- PolarisAuthorizer polarisAuthorizer,
- TaskExecutor taskExecutor,
- FileIOFactory fileIOFactory) {
+ PolarisAuthorizer polarisAuthorizer) {
this.realmContext = realmContext;
+ this.catalogFactory = catalogFactory;
this.entityManager = entityManager;
this.metaStoreManager = metaStoreManager;
this.session = session;
this.configurationStore = configurationStore;
this.diagnostics = diagnostics;
this.polarisAuthorizer = polarisAuthorizer;
- this.taskExecutor = taskExecutor;
- this.fileIOFactory = fileIOFactory;
}
/**
@@ -186,10 +182,9 @@ public class IcebergCatalogAdapter
entityManager,
metaStoreManager,
securityContext,
+ catalogFactory,
catalogName,
- polarisAuthorizer,
- taskExecutor,
- fileIOFactory);
+ polarisAuthorizer);
}
@Override
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
index 26befced..95c8c485 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
@@ -20,26 +20,21 @@ package org.apache.polaris.service.catalog;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import jakarta.annotation.Nonnull;
import jakarta.ws.rs.core.SecurityContext;
import java.io.Closeable;
import java.io.IOException;
-import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
@@ -84,7 +79,6 @@ import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.CatalogEntity;
-import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisEntityManager;
@@ -96,8 +90,7 @@ import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.storage.PolarisStorageActions;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.task.TaskExecutor;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.types.NotificationRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,8 +123,7 @@ public class PolarisCatalogHandlerWrapper implements
AutoCloseable {
private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
private final SecurityContext securityContext;
private final PolarisAuthorizer authorizer;
- private final TaskExecutor taskExecutor;
- private final FileIOFactory fileIOFactory;
+ private final CallContextCatalogFactory catalogFactory;
// Initialized in the authorize methods.
private PolarisResolutionManifest resolutionManifest = null;
@@ -150,10 +142,9 @@ public class PolarisCatalogHandlerWrapper implements
AutoCloseable {
PolarisEntityManager entityManager,
PolarisMetaStoreManager metaStoreManager,
SecurityContext securityContext,
+ CallContextCatalogFactory catalogFactory,
String catalogName,
- PolarisAuthorizer authorizer,
- TaskExecutor taskExecutor,
- FileIOFactory fileIOFactory) {
+ PolarisAuthorizer authorizer) {
this.realmContext = realmContext;
this.session = session;
this.entityManager = entityManager;
@@ -171,8 +162,7 @@ public class PolarisCatalogHandlerWrapper implements
AutoCloseable {
this.authenticatedPrincipal =
(AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal();
this.authorizer = authorizer;
- this.taskExecutor = taskExecutor;
- this.fileIOFactory = fileIOFactory;
+ this.catalogFactory = catalogFactory;
}
/**
@@ -204,53 +194,14 @@ public class PolarisCatalogHandlerWrapper implements
AutoCloseable {
}
private void initializeCatalog() {
- PolarisBaseEntity baseCatalogEntity =
-
resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity();
- CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity);
-
- String realm = realmContext.getRealmIdentifier();
- String catalogKey = realm + "/" + catalogName;
- LOGGER.info("Initializing new BasePolarisCatalog for key: {}", catalogKey);
-
- Map<String, String> catalogProperties = new
HashMap<>(catalog.getPropertiesAsMap());
- String defaultBaseLocation = catalog.getDefaultBaseLocation();
- LOGGER.info("Looked up defaultBaseLocation {} for catalog {}",
defaultBaseLocation, catalogKey);
- catalogProperties.put(
- CatalogProperties.WAREHOUSE_LOCATION,
- Objects.requireNonNullElseGet(
- defaultBaseLocation,
- () -> Paths.get(WAREHOUSE_LOCATION_BASEDIR,
catalogKey).toString()));
-
- this.baseCatalog = createBasePolarisCatalog(catalogProperties);
+ this.baseCatalog =
+ catalogFactory.createCallContextCatalog(
+ realmContext, authenticatedPrincipal, securityContext,
resolutionManifest);
this.namespaceCatalog =
(baseCatalog instanceof SupportsNamespaces) ? (SupportsNamespaces)
baseCatalog : null;
this.viewCatalog = (baseCatalog instanceof ViewCatalog) ? (ViewCatalog)
baseCatalog : null;
}
- private static final String WAREHOUSE_LOCATION_BASEDIR =
- "/tmp/iceberg_rest_server_warehouse_data/";
-
- @Nonnull
- protected Catalog createBasePolarisCatalog(Map<String, String>
catalogProperties) {
-
- BasePolarisCatalog catalogInstance =
- new BasePolarisCatalog(
- realmContext,
- entityManager,
- metaStoreManager,
- session,
- configurationStore,
- diagnostics,
- resolutionManifest,
- securityContext,
- taskExecutor,
- fileIOFactory);
-
- // TODO: The initialize properties might need to take more from the
CatalogEntity.
- catalogInstance.initialize(catalogName, catalogProperties);
- return catalogInstance;
- }
-
private void authorizeBasicNamespaceOperationOrThrow(
PolarisAuthorizableOperation op, Namespace namespace) {
authorizeBasicNamespaceOperationOrThrow(op, namespace, null, null);
diff --git
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
new file mode 100644
index 00000000..b300aa32
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.context;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
+
+public interface CallContextCatalogFactory {
+ Catalog createCallContextCatalog(
+ RealmContext realmContext,
+ AuthenticatedPolarisPrincipal authenticatedPrincipal,
+ SecurityContext securityContext,
+ PolarisResolutionManifest resolvedManifest);
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
new file mode 100644
index 00000000..85773a3b
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.context;
+
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.SecurityContext;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.task.TaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RequestScoped
+public class PolarisCallContextCatalogFactory implements
CallContextCatalogFactory {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PolarisCallContextCatalogFactory.class);
+
+ private static final String WAREHOUSE_LOCATION_BASEDIR =
+ "/tmp/iceberg_rest_server_warehouse_data/";
+
+ private final PolarisEntityManager entityManager;
+ private final PolarisMetaStoreManager metaStoreManager;
+ private final PolarisMetaStoreSession metaStoreSession;
+ private final PolarisConfigurationStore configurationStore;
+ private final PolarisDiagnostics diagnostics;
+ private final TaskExecutor taskExecutor;
+ private final FileIOFactory fileIOFactory;
+
+ @Inject
+ public PolarisCallContextCatalogFactory(
+ PolarisEntityManager entityManager,
+ PolarisMetaStoreManager metaStoreManager,
+ PolarisMetaStoreSession metaStoreSession,
+ PolarisConfigurationStore configurationStore,
+ PolarisDiagnostics diagnostics,
+ TaskExecutor taskExecutor,
+ FileIOFactory fileIOFactory) {
+ this.entityManager = entityManager;
+ this.metaStoreManager = metaStoreManager;
+ this.metaStoreSession = metaStoreSession;
+ this.configurationStore = configurationStore;
+ this.diagnostics = diagnostics;
+ this.taskExecutor = taskExecutor;
+ this.fileIOFactory = fileIOFactory;
+ }
+
+ @Override
+ public Catalog createCallContextCatalog(
+ RealmContext realmContext,
+ AuthenticatedPolarisPrincipal authenticatedPrincipal,
+ SecurityContext securityContext,
+ final PolarisResolutionManifest resolvedManifest) {
+ PolarisBaseEntity baseCatalogEntity =
+
resolvedManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity();
+ String catalogName = baseCatalogEntity.getName();
+
+ String realm = realmContext.getRealmIdentifier();
+ String catalogKey = realm + "/" + catalogName;
+ LOGGER.info("Initializing new BasePolarisCatalog for key: {}", catalogKey);
+
+ BasePolarisCatalog catalogInstance =
+ new BasePolarisCatalog(
+ realmContext,
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ diagnostics,
+ resolvedManifest,
+ securityContext,
+ taskExecutor,
+ fileIOFactory);
+
+ CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity);
+ Map<String, String> catalogProperties = new
HashMap<>(catalog.getPropertiesAsMap());
+ String defaultBaseLocation = catalog.getDefaultBaseLocation();
+ LOGGER.info("Looked up defaultBaseLocation {} for catalog {}",
defaultBaseLocation, catalogKey);
+ catalogProperties.put(
+ CatalogProperties.WAREHOUSE_LOCATION,
+ Objects.requireNonNullElseGet(
+ defaultBaseLocation,
+ () -> Paths.get(WAREHOUSE_LOCATION_BASEDIR,
catalogKey).toString()));
+
+ // TODO: The initialize properties might need to take more from
CallContext and the
+ // CatalogEntity.
+ catalogInstance.initialize(catalogName, catalogProperties);
+
+ return catalogInstance;
+ }
+}
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 8dd43bf3..a66f55b9 100644
---
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -47,6 +47,8 @@ import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
@@ -141,17 +143,27 @@ public record TestServices(
realmEntityManagerFactory, metaStoreManagerFactory,
configurationStore);
TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
+
+ CallContextCatalogFactory callContextFactory =
+ new PolarisCallContextCatalogFactory(
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ polarisDiagnostics,
+ Mockito.mock(TaskExecutor.class),
+ fileIOFactory);
+
IcebergRestCatalogApiService service =
new IcebergCatalogAdapter(
realm,
+ callContextFactory,
entityManager,
metaStoreManager,
metaStoreSession,
configurationStore,
polarisDiagnostics,
- authorizer,
- taskExecutor,
- fileIOFactory);
+ authorizer);
IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);