This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 23385a00 Extend FileIOFactory to Improve Customization (#724)
23385a00 is described below
commit 23385a00d89bc43e6b869eb0ea65ecb97b6c9bb7
Author: Rulin Xing <[email protected]>
AuthorDate: Wed Jan 29 13:35:50 2025 -0800
Extend FileIOFactory to Improve Customization (#724)
* Initial Commit
* Fix test errors
* Resolved some comments
* Resolved some comments
* Removed unused package
* Make FileIOFactory as an ApplicationScoped bean
* Revert some changes
* Use FileIOFactory to get the creds for cleanup task
* Use new interface in tests
* Move some util classes to polaris-service-common as test fixtures
* Delete the interface in FileIOFactory that is only used for testing
* Refactor TestServices and use it in FileIOFactoryTest
* Small change
* Rename TestFileIOFactory to MeasuredFileIOFactory
* Use FileIOFactory to load catalog default FileIO in tests, fix log level
for FileIOUtil
---------
Co-authored-by: Rulin Xing <[email protected]>
---
quarkus/service/build.gradle.kts | 1 +
.../polaris/service/quarkus/TestServices.java | 163 ---------------
.../quarkus/admin/ManagementServiceTest.java | 9 +-
.../quarkus/admin/PolarisAuthzTestBase.java | 8 +-
.../admin/PolarisOverlappingCatalogTest.java | 6 +-
.../quarkus/admin/PolarisOverlappingTableTest.java | 4 +-
.../quarkus/catalog/BasePolarisCatalogTest.java | 80 +++----
.../catalog/BasePolarisCatalogViewTest.java | 6 +-
.../PolarisCatalogHandlerWrapperAuthzTest.java | 3 +-
.../quarkus/catalog/io/FileIOExceptionsTest.java | 9 +-
service/common/build.gradle.kts | 26 +++
.../service/catalog/BasePolarisCatalog.java | 135 ++++--------
.../service/catalog/io/DefaultFileIOFactory.java | 91 +++++++-
.../polaris/service/catalog/io/FileIOFactory.java | 38 +++-
.../polaris/service/catalog/io/FileIOUtil.java | 131 ++++++++++++
.../catalog/io/WasbTranslatingFileIOFactory.java | 43 +++-
.../polaris/service/task/TaskFileIOSupplier.java | 55 ++---
.../service/catalog/io/FileIOFactoryTest.java | 229 +++++++++++++++++++++
.../org/apache/polaris/service/TestServices.java | 217 +++++++++++++++++++
.../catalog/PolarisPassthroughResolutionView.java | 2 +-
.../polaris/service/catalog/io/MeasuredFileIO.java | 10 +-
.../service/catalog/io/MeasuredFileIOFactory.java | 59 ++++--
.../service/catalog/io/MeasuredInputFile.java | 6 +-
23 files changed, 947 insertions(+), 384 deletions(-)
diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts
index 72646486..7ab5ec87 100644
--- a/quarkus/service/build.gradle.kts
+++ b/quarkus/service/build.gradle.kts
@@ -90,6 +90,7 @@ dependencies {
testFixturesApi(project(":polaris-tests"))
testImplementation(project(":polaris-api-management-model"))
+ testImplementation(testFixtures(project(":polaris-service-common")))
testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests")
testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests")
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
deleted file mode 100644
index 81d34dd5..00000000
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.quarkus;
-
-import com.google.auth.oauth2.AccessToken;
-import com.google.auth.oauth2.GoogleCredentials;
-import jakarta.ws.rs.core.SecurityContext;
-import java.security.Principal;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import org.apache.polaris.core.PolarisDiagnostics;
-import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
-import org.apache.polaris.core.auth.PolarisAuthorizer;
-import org.apache.polaris.core.context.RealmId;
-import org.apache.polaris.core.entity.PolarisEntity;
-import org.apache.polaris.core.entity.PrincipalEntity;
-import org.apache.polaris.core.persistence.PolarisEntityManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
-import org.apache.polaris.service.admin.PolarisServiceImpl;
-import org.apache.polaris.service.admin.api.PolarisCatalogsApi;
-import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
-import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
-import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.config.DefaultConfigurationStore;
-import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
-import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
-import org.apache.polaris.service.task.TaskExecutor;
-import org.mockito.Mockito;
-
-public record TestServices(
- IcebergRestCatalogApi restApi,
- PolarisCatalogsApi catalogsApi,
- RealmId realmId,
- SecurityContext securityContext) {
-
- private static final RealmId testRealm = RealmId.newRealmId("test-realm");
-
- public static TestServices inMemory(Map<String, Object> config) {
- return inMemory(new TestFileIOFactory(), config);
- }
-
- public static TestServices inMemory(FileIOFactory ioFactory) {
- return inMemory(ioFactory, Map.of());
- }
-
- public static TestServices inMemory(FileIOFactory ioFactory, Map<String,
Object> config) {
-
- DefaultConfigurationStore configurationStore = new
DefaultConfigurationStore(config);
- PolarisDiagnostics polarisDiagnostics =
Mockito.mock(PolarisDiagnostics.class);
-
- PolarisStorageIntegrationProviderImpl storageIntegrationProvider =
- new PolarisStorageIntegrationProviderImpl(
- Mockito::mock,
- () -> GoogleCredentials.create(new AccessToken("abc", new Date())),
- configurationStore);
-
- InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory =
- new InMemoryPolarisMetaStoreManagerFactory(
- storageIntegrationProvider,
- configurationStore,
- polarisDiagnostics,
- Clock.systemDefaultZone());
-
- PolarisMetaStoreManager metaStoreManager =
- metaStoreManagerFactory.getOrCreateMetaStoreManager(testRealm);
-
- PolarisMetaStoreSession session =
- metaStoreManagerFactory.getOrCreateSessionSupplier(testRealm).get();
-
- RealmEntityManagerFactory realmEntityManagerFactory =
- new RealmEntityManagerFactory(metaStoreManagerFactory,
polarisDiagnostics) {};
-
- PolarisEntityManager entityManager =
- realmEntityManagerFactory.getOrCreateEntityManager(testRealm);
-
- PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class);
-
- IcebergRestCatalogApiService service =
- new IcebergCatalogAdapter(
- testRealm,
- entityManager,
- metaStoreManager,
- session,
- configurationStore,
- polarisDiagnostics,
- authorizer,
- Mockito.mock(TaskExecutor.class),
- ioFactory);
-
- IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
-
- PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal =
- metaStoreManager.createPrincipal(
- session,
- new PrincipalEntity.Builder()
- .setName("test-principal")
- .setCreateTimestamp(Instant.now().toEpochMilli())
- .setCredentialRotationRequiredState()
- .build());
-
- AuthenticatedPolarisPrincipal principal =
- new AuthenticatedPolarisPrincipal(
- PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of());
-
- SecurityContext securityContext =
- new SecurityContext() {
- @Override
- public Principal getUserPrincipal() {
- return principal;
- }
-
- @Override
- public boolean isUserInRole(String s) {
- return false;
- }
-
- @Override
- public boolean isSecure() {
- return true;
- }
-
- @Override
- public String getAuthenticationScheme() {
- return "";
- }
- };
-
- PolarisCatalogsApi catalogsApi =
- new PolarisCatalogsApi(
- new PolarisServiceImpl(
- entityManager,
- metaStoreManager,
- session,
- configurationStore,
- authorizer,
- polarisDiagnostics));
-
- return new TestServices(restApi, catalogsApi, testRealm, securityContext);
- }
-}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
index 4af7b233..c4e10852 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
@@ -32,15 +32,14 @@ import
org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.admin.model.UpdateCatalogRequest;
-import org.apache.polaris.service.quarkus.TestServices;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
+import org.apache.polaris.service.TestServices;
import org.junit.jupiter.api.Test;
public class ManagementServiceTest {
static TestServices services =
- TestServices.inMemory(
- new TestFileIOFactory(),
- Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3", "GCS",
"AZURE")));
+ TestServices.builder()
+ .config(Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3",
"GCS", "AZURE")))
+ .build();
@Test
public void testCreateCatalogWithDisallowedStorageConfig() {
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 485f4634..629951df 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -67,10 +67,11 @@ import
org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import
org.apache.polaris.service.quarkus.catalog.PolarisPassthroughResolutionView;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
@@ -160,6 +161,7 @@ public abstract class PolarisAuthzTestBase {
protected PolarisEntityManager entityManager;
protected PolarisMetaStoreManager metaStoreManager;
protected PolarisMetaStoreSession metaStoreSession;
+ protected FileIOFactory fileIOFactory;
protected PolarisBaseEntity catalogEntity;
protected PrincipalEntity principalEntity;
protected RealmId realmId;
@@ -385,6 +387,8 @@ public abstract class PolarisAuthzTestBase {
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
entityManager, metaStoreSession, securityContext, CATALOG_NAME);
+ this.fileIOFactory =
+ new DefaultFileIOFactory(realmEntityManagerFactory, managerFactory,
configurationStore);
this.baseCatalog =
new BasePolarisCatalog(
realmId,
@@ -396,7 +400,7 @@ public abstract class PolarisAuthzTestBase {
passthroughView,
securityContext,
Mockito.mock(),
- new DefaultFileIOFactory());
+ fileIOFactory);
this.baseCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
index 61a3fdc5..9e60049c 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
@@ -33,16 +33,14 @@ import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.CreateCatalogRequest;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
+import org.apache.polaris.service.TestServices;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
public class PolarisOverlappingCatalogTest {
static TestServices services =
- TestServices.inMemory(
- new TestFileIOFactory(), Map.of("ALLOW_OVERLAPPING_CATALOG_URLS",
"false"));
+ TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS",
"false")).build();
private Response createCatalog(String prefix, String defaultBaseLocation,
boolean isExternal) {
return createCatalog(prefix, defaultBaseLocation, isExternal, new
ArrayList<String>());
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
index 2791bfa7..03a5b723 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
@@ -36,7 +36,7 @@ import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.CreateCatalogRequest;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
+import org.apache.polaris.service.TestServices;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -105,7 +105,7 @@ public class PolarisOverlappingTableTest {
Map<String, Object> serverConfig,
Map<String, String> catalogConfig,
int expectedStatusForOverlaps) {
- TestServices services = TestServices.inMemory(serverConfig);
+ TestServices services =
TestServices.builder().config(serverConfig).build();
CatalogProperties.Builder propertiesBuilder =
CatalogProperties.builder()
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
index 38580b9a..cab70907 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
@@ -23,7 +23,6 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -96,11 +95,12 @@ import
org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
import org.apache.polaris.service.task.TaskExecutor;
@@ -163,6 +163,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
private PolarisMetaStoreSession metaStoreSession;
private PolarisAdminService adminService;
private PolarisEntityManager entityManager;
+ private FileIOFactory fileIOFactory;
private AuthenticatedPolarisPrincipal authenticatedRoot;
private PolarisEntity catalogEntity;
private SecurityContext securityContext;
@@ -241,22 +242,9 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
new PolarisPassthroughResolutionView(
entityManager, metaStoreSession, securityContext, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
- this.catalog =
- new BasePolarisCatalog(
- realmId,
- entityManager,
- metaStoreManager,
- metaStoreSession,
- configurationStore,
- diagServices,
- passthroughView,
- securityContext,
- taskExecutor,
- new DefaultFileIOFactory());
- this.catalog.initialize(
- CATALOG_NAME,
- ImmutableMap.of(
- CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+ this.fileIOFactory =
+ new DefaultFileIOFactory(entityManagerFactory, managerFactory,
configurationStore);
+
StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
.thenReturn(
@@ -273,6 +261,23 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
when(storageIntegrationProvider.getStorageIntegrationForConfig(
isA(AwsStorageConfigurationInfo.class)))
.thenReturn((PolarisStorageIntegration) storageIntegration);
+
+ this.catalog =
+ new BasePolarisCatalog(
+ realmId,
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ diagServices,
+ passthroughView,
+ securityContext,
+ taskExecutor,
+ fileIOFactory);
+ this.catalog.initialize(
+ CATALOG_NAME,
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
}
@AfterEach
@@ -509,7 +514,8 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
// filename.
final String tableLocation =
"s3://externally-owned-bucket/validate_table/";
final String tableMetadataLocation = tableLocation + "metadata/";
- FileIOFactory fileIoFactory = spy(new DefaultFileIOFactory());
+ FileIOFactory fileIoFactory =
+ spy(new DefaultFileIOFactory(entityManagerFactory, managerFactory,
configurationStore));
BasePolarisCatalog catalog =
new BasePolarisCatalog(
realmId,
@@ -541,7 +547,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
doThrow(new ForbiddenException("Fake failure applying downscoped
credentials"))
.when(fileIoFactory)
- .loadFileIO(any(), any());
+ .loadFileIO(any(), any(), any(), any(), any(), any(), any());
Assertions.assertThatThrownBy(() -> catalog.sendNotification(table,
request))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining("Fake failure applying downscoped credentials");
@@ -849,7 +855,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
passthroughView,
securityContext,
taskExecutor,
- new DefaultFileIOFactory());
+ fileIOFactory);
catalog.initialize(
catalogWithoutStorage,
ImmutableMap.of(
@@ -914,7 +920,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
passthroughView,
securityContext,
taskExecutor,
- new DefaultFileIOFactory());
+ fileIOFactory);
catalog.initialize(
catalogName,
ImmutableMap.of(
@@ -1406,10 +1412,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
.containsEntry(PolarisCredentialProperty.AWS_KEY_ID, TEST_ACCESS_KEY)
.containsEntry(PolarisCredentialProperty.AWS_SECRET_KEY,
SECRET_ACCESS_KEY)
.containsEntry(PolarisCredentialProperty.AWS_TOKEN, SESSION_TOKEN);
- FileIO fileIO =
- new TaskFileIOSupplier(
- createMockMetaStoreManagerFactory(), new
DefaultFileIOFactory(), configurationStore)
- .apply(taskEntity, realmId);
+ FileIO fileIO = new TaskFileIOSupplier(fileIOFactory).apply(taskEntity,
realmId);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
}
@@ -1450,7 +1453,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
passthroughView,
securityContext,
Mockito.mock(),
- new DefaultFileIOFactory());
+ fileIOFactory);
noPurgeCatalog.initialize(
noPurgeCatalogName,
ImmutableMap.of(
@@ -1522,7 +1525,8 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
new PolarisPassthroughResolutionView(
entityManager, metaStoreSession, securityContext, CATALOG_NAME);
- TestFileIOFactory measured = new TestFileIOFactory();
+ MeasuredFileIOFactory measured =
+ new MeasuredFileIOFactory(entityManagerFactory, managerFactory,
configurationStore);
BasePolarisCatalog catalog =
new BasePolarisCatalog(
realmId,
@@ -1558,22 +1562,24 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
.isGreaterThan(0);
Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should
succeed").isTrue();
+ TaskEntity taskEntity =
+ TaskEntity.of(
+ metaStoreManager
+ .loadTasks(metaStoreSession, "testExecutor", 1)
+ .getEntities()
+ .getFirst());
+ Map<String, String> properties = taskEntity.getInternalPropertiesAsMap();
+ properties.put(CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO");
+ taskEntity.setInternalPropertiesAsMap(properties);
TableCleanupTaskHandler handler =
new TableCleanupTaskHandler(
Mockito.mock(),
createMockMetaStoreManagerFactory(),
configurationStore,
diagServices,
- (task, rc) ->
-
measured.loadFileIO("org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()),
+ new TaskFileIOSupplier(measured),
clock);
- handler.handleTask(
- TaskEntity.of(
- metaStoreManager
- .loadTasks(metaStoreSession, "testExecutor", 1)
- .getEntities()
- .getFirst()),
- realmId);
+ handler.handleTask(taskEntity, realmId);
Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was
deleted").isGreaterThan(0);
}
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
index b6bbefef..f5cdbcaf 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
@@ -56,7 +56,9 @@ import
org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.junit.jupiter.api.AfterEach;
@@ -171,6 +173,8 @@ public class BasePolarisCatalogViewTest extends
ViewCatalogTests<BasePolarisCata
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
entityManager, metaStoreSession, securityContext, CATALOG_NAME);
+ FileIOFactory fileIOFactory =
+ new DefaultFileIOFactory(entityManagerFactory, managerFactory,
configurationStore);
this.catalog =
new BasePolarisCatalog(
realmId,
@@ -182,7 +186,7 @@ public class BasePolarisCatalogViewTest extends
ViewCatalogTests<BasePolarisCata
passthroughView,
securityContext,
Mockito.mock(),
- new DefaultFileIOFactory());
+ fileIOFactory);
this.catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
index 5f6b0dec..ef3b7fa5 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
@@ -58,7 +58,6 @@ import org.apache.polaris.core.entity.PolarisPrivilege;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.service.catalog.PolarisCatalogHandlerWrapper;
-import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
@@ -117,7 +116,7 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends
PolarisAuthzTestBase
catalogName,
polarisAuthorizer,
Mockito.mock(),
- new DefaultFileIOFactory());
+ fileIOFactory);
}
/**
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
index 13349c82..e78d9a68 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
@@ -38,7 +38,8 @@ import
org.apache.polaris.core.admin.model.CreateCatalogRequest;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -53,13 +54,13 @@ public class FileIOExceptionsTest {
private static final String catalog = "test-catalog";
private static final String catalogBaseLocation =
"file:/tmp/buckets/my-bucket/path/to/data";
- private static TestFileIOFactory ioFactory;
private static TestServices services;
+ private static MeasuredFileIOFactory ioFactory;
@BeforeAll
public static void beforeAll() {
- ioFactory = new TestFileIOFactory();
- services = TestServices.inMemory(ioFactory);
+ services = TestServices.builder().build();
+ ioFactory = (MeasuredFileIOFactory) services.fileIOFactory();
FileStorageConfigInfo storageConfigInfo =
FileStorageConfigInfo.builder()
diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts
index 79d2809c..58c99286 100644
--- a/service/common/build.gradle.kts
+++ b/service/common/build.gradle.kts
@@ -19,6 +19,7 @@
plugins {
id("polaris-server")
+ id("java-test-fixtures")
alias(libs.plugins.jandex)
}
@@ -93,6 +94,31 @@ dependencies {
testImplementation(libs.assertj.core)
testImplementation(libs.mockito.core)
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
+
+ testFixturesImplementation(project(":polaris-core"))
+ testFixturesImplementation(project(":polaris-api-management-model"))
+ testFixturesImplementation(project(":polaris-api-management-service"))
+ testFixturesImplementation(project(":polaris-api-iceberg-service"))
+
+ testFixturesImplementation(libs.jakarta.enterprise.cdi.api)
+ testFixturesImplementation(libs.jakarta.annotation.api)
+ testFixturesImplementation(libs.jakarta.ws.rs.api)
+
+ testFixturesImplementation(platform(libs.quarkus.bom))
+ testFixturesImplementation("io.quarkus:quarkus-rest-client")
+ testFixturesImplementation("io.quarkus:quarkus-rest-client-jackson")
+
+ testFixturesImplementation(platform(libs.iceberg.bom))
+ testFixturesImplementation("org.apache.iceberg:iceberg-api")
+ testFixturesImplementation("org.apache.iceberg:iceberg-core")
+ testFixturesImplementation("org.apache.iceberg:iceberg-aws")
+
+ testFixturesImplementation(platform(libs.google.cloud.storage.bom))
+ testFixturesImplementation("com.google.cloud:google-cloud-storage")
+ testFixturesImplementation(platform(libs.awssdk.bom))
+ testFixturesImplementation("software.amazon.awssdk:sts")
+ testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
+ testFixturesImplementation("software.amazon.awssdk:s3")
}
tasks.named("javadoc") { dependsOn("jandex") }
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 06bbc84d..91c18b48 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -93,6 +93,7 @@ import
org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
@@ -104,6 +105,7 @@ import
org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
@@ -324,7 +326,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
String.format("Failed to fetch resolved parent for TableIdentifier
'%s'", identifier));
}
FileIO fileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
identifier,
Set.of(locationDir),
resolvedParent,
@@ -818,10 +820,15 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
.log("Table entity has no storage configuration in its hierarchy");
return Map.of();
}
- return refreshCredentials(
+ return FileIOUtil.refreshCredentials(
+ realmId,
+ entityManager,
+ getCredentialVendor(),
+ metaStoreSession,
+ configurationStore,
tableIdentifier,
- storageActions,
getLocationsAllowedToBeAccessed(tableMetadata),
+ storageActions,
storageInfo.get());
}
@@ -857,62 +864,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
? resolvedEntityView.getResolvedPath(tableIdentifier.namespace())
: resolvedTableEntities;
- return findStorageInfoFromHierarchy(resolvedStorageEntity);
- }
-
- private Map<String, String> refreshCredentials(
- TableIdentifier tableIdentifier,
- Set<PolarisStorageActions> storageActions,
- String tableLocation,
- PolarisEntity entity) {
- return refreshCredentials(tableIdentifier, storageActions,
Set.of(tableLocation), entity);
- }
-
- private Map<String, String> refreshCredentials(
- TableIdentifier tableIdentifier,
- Set<PolarisStorageActions> storageActions,
- Set<String> tableLocations,
- PolarisEntity entity) {
- Boolean skipCredentialSubscopingIndirection =
- getBooleanContextConfiguration(
- PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
-
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
- if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
- LOGGER
- .atInfo()
- .addKeyValue("tableIdentifier", tableIdentifier)
- .log("Skipping generation of subscoped creds for table");
- return Map.of();
- }
-
- boolean allowList =
- storageActions.contains(PolarisStorageActions.LIST)
- || storageActions.contains(PolarisStorageActions.ALL);
- Set<String> writeLocations =
- storageActions.contains(PolarisStorageActions.WRITE)
- || storageActions.contains(PolarisStorageActions.DELETE)
- || storageActions.contains(PolarisStorageActions.ALL)
- ? tableLocations
- : Set.of();
- Map<String, String> credentialsMap =
- entityManager
- .getCredentialCache()
- .getOrGenerateSubScopeCreds(
- getCredentialVendor(),
- metaStoreSession,
- entity,
- allowList,
- tableLocations,
- writeLocations);
- LOGGER
- .atDebug()
- .addKeyValue("tableIdentifier", tableIdentifier)
- .addKeyValue("credentialKeys", credentialsMap.keySet())
- .log("Loaded scoped credentials for table");
- if (credentialsMap.isEmpty()) {
- LOGGER.debug("No credentials found for table");
- }
- return credentialsMap;
+ return FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
}
/**
@@ -1243,7 +1195,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
// then we should use the actual current table properties for IO
refresh here
// instead of the general tableDefaultProperties.
FileIO fileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
tableIdentifier,
Set.of(latestLocationDir),
resolvedEntities,
@@ -1279,7 +1231,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
// refresh credentials because we need to read the metadata file to
validate its location
tableFileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
tableIdentifier,
getLocationsAllowedToBeAccessed(metadata),
resolvedStorageEntity,
@@ -1418,18 +1370,6 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
}
}
- private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
- PolarisResolvedPathWrapper resolvedStorageEntity) {
- Optional<PolarisEntity> storageInfoEntity =
- resolvedStorageEntity.getRawFullPath().reversed().stream()
- .filter(
- e ->
- e.getInternalPropertiesAsMap()
-
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
- .findFirst();
- return storageInfoEntity;
- }
-
private class BasePolarisViewOperations extends BaseViewOperations {
private final TableIdentifier identifier;
private final String fullViewName;
@@ -1475,7 +1415,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
// then we should use the actual current table properties for IO
refresh here
// instead of the general tableDefaultProperties.
FileIO fileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
identifier,
Set.of(latestLocationDir),
resolvedEntities,
@@ -1529,7 +1469,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
Map<String, String> tableProperties = new
HashMap<>(metadata.properties());
viewFileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
identifier,
getLocationsAllowedToBeAccessed(metadata),
resolvedStorageEntity,
@@ -1586,27 +1526,22 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
}
}
- private FileIO refreshIOWithCredentials(
+ private FileIO loadFileIOForTableLike(
TableIdentifier identifier,
Set<String> readLocations,
PolarisResolvedPathWrapper resolvedStorageEntity,
Map<String, String> tableProperties,
Set<PolarisStorageActions> storageActions) {
- Optional<PolarisEntity> storageInfoEntity =
findStorageInfoFromHierarchy(resolvedStorageEntity);
- Map<String, String> credentialsMap =
- storageInfoEntity
- .map(
- storageInfo ->
- refreshCredentials(identifier, storageActions,
readLocations, storageInfo))
- .orElse(Map.of());
-
- // Update the FileIO before we write the new metadata file
- // update with table properties in case there are table-level overrides
- // the credentials should always override table-level properties, since
- // storage configuration will be found at whatever entity defines it
- tableProperties.putAll(credentialsMap);
- FileIO fileIO = null;
- fileIO = loadFileIO(ioImplClassName, tableProperties);
+ // Reload fileIO based on table specific context
+ FileIO fileIO =
+ fileIOFactory.loadFileIO(
+ realmId,
+ ioImplClassName,
+ tableProperties,
+ identifier,
+ readLocations,
+ storageActions,
+ resolvedStorageEntity);
// ensure the new fileIO is closed when the catalog is closed
closeableGroup.addCloseable(fileIO);
return fileIO;
@@ -1888,7 +1823,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
.toArray(String[]::new));
resolvedStorageEntity = resolvedEntityView.getResolvedPath(nsLevel);
if (resolvedStorageEntity != null) {
- storageInfoEntity =
findStorageInfoFromHierarchy(resolvedStorageEntity);
+ storageInfoEntity =
FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
break;
}
}
@@ -1905,7 +1840,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
// Validate that we can construct a FileIO
String locationDir = metadataLocation.substring(0,
metadataLocation.lastIndexOf("/"));
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
tableIdentifier,
Set.of(locationDir),
resolvedStorageEntity,
@@ -1960,7 +1895,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
String locationDir = newLocation.substring(0,
newLocation.lastIndexOf("/"));
FileIO fileIO =
- refreshIOWithCredentials(
+ loadFileIOForTableLike(
tableIdentifier,
Set.of(locationDir),
resolvedParent,
@@ -2039,8 +1974,16 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
* @return FileIO object
*/
private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
- Map<String, String> propertiesWithS3CustomizedClientFactory = new
HashMap<>(properties);
- return fileIOFactory.loadFileIO(ioImpl,
propertiesWithS3CustomizedClientFactory);
+ TableLikeEntity tableLikeEntity = TableLikeEntity.of(catalogEntity);
+ TableIdentifier identifier = tableLikeEntity.getTableIdentifier();
+ Set<String> locations = Set.of(catalogEntity.getDefaultBaseLocation());
+ ResolvedPolarisEntity resolvedCatalogEntity =
+ new ResolvedPolarisEntity(catalogEntity, List.of(), List.of());
+ PolarisResolvedPathWrapper resolvedPath =
+ new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity));
+ Set<PolarisStorageActions> storageActions =
Set.of(PolarisStorageActions.ALL);
+ return fileIOFactory.loadFileIO(
+ realmId, ioImpl, properties, identifier, locations, storageActions,
resolvedPath);
}
private void blockedUserSpecifiedWriteLocation(Map<String, String>
properties) {
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
index b962ec79..a92d4c35 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
@@ -18,19 +18,104 @@
*/
package org.apache.polaris.service.catalog.io;
+import com.google.common.annotations.VisibleForTesting;
import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
-/** A simple FileIOFactory implementation that defers all the work to the
Iceberg SDK */
+/**
+ * A default FileIO factory implementation for creating Iceberg {@link FileIO}
instances with
+ * contextual table-level properties.
+ *
+ * <p>This class acts as a translation layer between Polaris properties and
the properties required
+ * by Iceberg's {@link FileIO}. For example, it evaluates storage actions and
retrieves subscoped
+ * credentials to initialize a {@link FileIO} instance with the most limited
permissions necessary.
+ */
@ApplicationScoped
@Identifier("default")
public class DefaultFileIOFactory implements FileIOFactory {
+
+ private final RealmEntityManagerFactory realmEntityManagerFactory;
+ private final MetaStoreManagerFactory metaStoreManagerFactory;
+ private final PolarisConfigurationStore configurationStore;
+
+ @Inject
+ public DefaultFileIOFactory(
+ RealmEntityManagerFactory realmEntityManagerFactory,
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore configurationStore) {
+ this.realmEntityManagerFactory = realmEntityManagerFactory;
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.configurationStore = configurationStore;
+ }
+
@Override
- public FileIO loadFileIO(String impl, Map<String, String> properties) {
- return CatalogUtil.loadFileIO(impl, properties, new Configuration());
+ public FileIO loadFileIO(
+ @Nonnull RealmId realmId,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
+ PolarisEntityManager entityManager =
+ realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+ PolarisCredentialVendor credentialVendor =
+ metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+ PolarisMetaStoreSession metaStoreSession =
+ metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+ // Get subcoped creds
+ properties = new HashMap<>(properties);
+ Optional<PolarisEntity> storageInfoEntity =
+ FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath);
+ Map<String, String> credentialsMap =
+ storageInfoEntity
+ .map(
+ storageInfo ->
+ FileIOUtil.refreshCredentials(
+ realmId,
+ entityManager,
+ credentialVendor,
+ metaStoreSession,
+ configurationStore,
+ identifier,
+ tableLocations,
+ storageActions,
+ storageInfo))
+ .orElse(Map.of());
+
+ // Update the FileIO with the subscoped credentials
+ // Update with properties in case there are table-level overrides the
credentials should
+ // always override table-level properties, since storage configuration
will be found at
+ // whatever entity defines it
+ properties.putAll(credentialsMap);
+
+ return loadFileIOInternal(ioImplClassName, properties);
+ }
+
+ @VisibleForTesting
+ FileIO loadFileIOInternal(
+ @Nonnull String ioImplClassName, @Nonnull Map<String, String>
properties) {
+ return CatalogUtil.loadFileIO(ioImplClassName, properties, new
Configuration());
}
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
index ca3c0851..905441d7 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
@@ -18,10 +18,44 @@
*/
package org.apache.polaris.service.catalog.io;
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
-/** Interface for providing a way to construct FileIO objects, such as for
reading/writing S3. */
+/**
+ * Interface for providing a way to construct FileIO objects, such as for
reading/writing S3.
+ *
+ * <p>Implementations are available via CDI as {@link ApplicationScoped
@ApplicationScoped} beans.
+ */
public interface FileIOFactory {
- FileIO loadFileIO(String impl, Map<String, String> properties);
+
+ /**
+ * Loads a FileIO implementation for a specific table in the given realm
with detailed config.
+ *
+ * <p>This method may obtain subscoped credentials to restrict the FileIO's
permissions, ensuring
+ * secure and limited access to the table's data and locations.
+ *
+ * @param realmId the realm for which the FileIO is being loaded.
+ * @param ioImplClassName the class name of the FileIO implementation to
load.
+ * @param properties configuration properties for the FileIO.
+ * @param identifier the table identifier.
+ * @param tableLocations locations associated with the table.
+ * @param storageActions storage actions allowed for the table.
+ * @param resolvedEntityPath resolved paths for the entities.
+ * @return a configured FileIO instance.
+ */
+ FileIO loadFileIO(
+ @Nonnull RealmId realmId,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath);
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
new file mode 100644
index 00000000..9775ebd8
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileIOUtil {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileIOUtil.class);
+
+ private FileIOUtil() {}
+
+ /**
+ * Finds storage configuration information in the hierarchy of the resolved
storage entity.
+ *
+ * <p>This method starts at the "leaf" level (e.g., table) and walks
"upwards" through namespaces
+ * in the hierarchy to the "root." It searches for the first entity
containing storage config
+ * properties, identified using a key from {@link
+ * PolarisEntityConstants#getStorageConfigInfoPropertyName()}.
+ *
+ * @param resolvedStorageEntity the resolved entity wrapper containing the
hierarchical path
+ * @return an {@link Optional} containing the entity with storage config, or
empty if not found
+ */
+ public static Optional<PolarisEntity> findStorageInfoFromHierarchy(
+ PolarisResolvedPathWrapper resolvedStorageEntity) {
+ Optional<PolarisEntity> storageInfoEntity =
+ resolvedStorageEntity.getRawFullPath().reversed().stream()
+ .filter(
+ e ->
+ e.getInternalPropertiesAsMap()
+
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+ .findFirst();
+ return storageInfoEntity;
+ }
+
+ /**
+ * Refreshes or generates subscoped creds for accessing table storage based
on the params.
+ *
+ * <p>Use cases:
+ *
+ * <ul>
+ * <li>In {@link BasePolarisCatalog}, subscoped credentials are generated
or refreshed when the
+ * client sends a loadTable request to vend credentials.
+ * <li>In {@link DefaultFileIOFactory}, subscoped credentials are obtained
to access the storage
+ * and read/write metadata JSON files.
+ * </ul>
+ */
+ public static Map<String, String> refreshCredentials(
+ RealmId realmId,
+ PolarisEntityManager entityManager,
+ PolarisCredentialVendor credentialVendor,
+ PolarisMetaStoreSession metaStoreSession,
+ PolarisConfigurationStore configurationStore,
+ TableIdentifier tableIdentifier,
+ Set<String> tableLocations,
+ Set<PolarisStorageActions> storageActions,
+ PolarisEntity entity) {
+ boolean skipCredentialSubscopingIndirection =
+ configurationStore.getConfiguration(
+ realmId,
+ PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+ if (skipCredentialSubscopingIndirection) {
+ LOGGER
+ .atDebug()
+ .addKeyValue("tableIdentifier", tableIdentifier)
+ .log("Skipping generation of subscoped creds for table");
+ return Map.of();
+ }
+
+ boolean allowList =
+ storageActions.contains(PolarisStorageActions.LIST)
+ || storageActions.contains(PolarisStorageActions.ALL);
+ Set<String> writeLocations =
+ storageActions.contains(PolarisStorageActions.WRITE)
+ || storageActions.contains(PolarisStorageActions.DELETE)
+ || storageActions.contains(PolarisStorageActions.ALL)
+ ? tableLocations
+ : Set.of();
+ Map<String, String> credentialsMap =
+ entityManager
+ .getCredentialCache()
+ .getOrGenerateSubScopeCreds(
+ credentialVendor,
+ metaStoreSession,
+ entity,
+ allowList,
+ tableLocations,
+ writeLocations);
+ LOGGER
+ .atDebug()
+ .addKeyValue("tableIdentifier", tableIdentifier)
+ .addKeyValue("credentialKeys", credentialsMap.keySet())
+ .log("Loaded scoped credentials for table");
+ if (credentialsMap.isEmpty()) {
+ LOGGER.debug("No credentials found for table");
+ }
+ return credentialsMap;
+ }
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
index 469587a3..fe2cd0a7 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
@@ -19,19 +19,54 @@
package org.apache.polaris.service.catalog.io;
import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
/** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
@ApplicationScoped
@Identifier("wasb")
public class WasbTranslatingFileIOFactory implements FileIOFactory {
+
+ private final FileIOFactory defaultFileIOFactory;
+
+ @Inject
+ public WasbTranslatingFileIOFactory(
+ RealmEntityManagerFactory realmEntityManagerFactory,
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore configurationStore) {
+ defaultFileIOFactory =
+ new DefaultFileIOFactory(
+ realmEntityManagerFactory, metaStoreManagerFactory,
configurationStore);
+ }
+
@Override
- public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
+ public FileIO loadFileIO(
+ @Nonnull RealmId realmId,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
return new WasbTranslatingFileIO(
- CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()));
+ defaultFileIOFactory.loadFileIO(
+ realmId,
+ ioImplClassName,
+ properties,
+ identifier,
+ tableLocations,
+ storageActions,
+ resolvedEntityPath));
}
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
index 71d2660e..7fa5802e 100644
---
a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
+++
b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
@@ -21,68 +21,51 @@ package org.apache.polaris.service.task;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
-import org.apache.polaris.core.PolarisConfiguration;
-import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmId;
import org.apache.polaris.core.entity.PolarisTaskConstants;
+import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.entity.TaskEntity;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.io.FileIOFactory;
@ApplicationScoped
public class TaskFileIOSupplier implements BiFunction<TaskEntity, RealmId,
FileIO> {
- private final MetaStoreManagerFactory metaStoreManagerFactory;
private final FileIOFactory fileIOFactory;
- private final PolarisConfigurationStore configurationStore;
@Inject
- public TaskFileIOSupplier(
- MetaStoreManagerFactory metaStoreManagerFactory,
- FileIOFactory fileIOFactory,
- PolarisConfigurationStore configurationStore) {
- this.metaStoreManagerFactory = metaStoreManagerFactory;
+ public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
this.fileIOFactory = fileIOFactory;
- this.configurationStore = configurationStore;
}
@Override
public FileIO apply(TaskEntity task, RealmId realmId) {
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
- String location =
internalProperties.get(PolarisTaskConstants.STORAGE_LOCATION);
- PolarisMetaStoreManager metaStoreManager =
- metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
- PolarisMetaStoreSession metaStoreSession =
- metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
Map<String, String> properties = new HashMap<>(internalProperties);
- Boolean skipCredentialSubscopingIndirection =
- configurationStore.getConfiguration(
- realmId,
- PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
-
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+ TableLikeEntity tableEntity = TableLikeEntity.of(task);
+ TableIdentifier identifier = tableEntity.getTableIdentifier();
+ String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
+ Set<String> locations = Set.of(location);
+ Set<PolarisStorageActions> storageActions =
Set.of(PolarisStorageActions.ALL);
+ ResolvedPolarisEntity resolvedTaskEntity =
+ new ResolvedPolarisEntity(task, List.of(), List.of());
+ PolarisResolvedPathWrapper resolvedPath =
+ new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity));
- if (!skipCredentialSubscopingIndirection) {
- properties.putAll(
- metaStoreManagerFactory
- .getOrCreateStorageCredentialCache(realmId)
- .getOrGenerateSubScopeCreds(
- metaStoreManager,
- metaStoreSession,
- task,
- true,
- Set.of(location),
- Set.of(location)));
- }
String ioImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.io.ResolvingFileIO");
- return fileIOFactory.loadFileIO(ioImpl, properties);
+
+ return fileIOFactory.loadFileIO(
+ realmId, ioImpl, properties, identifier, locations, storageActions,
resolvedPath);
}
}
diff --git
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
new file mode 100644
index 00000000..6455c168
--- /dev/null
+++
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import jakarta.annotation.Nonnull;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.*;
+import org.apache.polaris.core.persistence.*;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
+import org.apache.polaris.service.task.TaskFileIOSupplier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class FileIOFactoryTest {
+
+ public static final String CATALOG_NAME = "polaris-catalog";
+ public static final Namespace NS = Namespace.of("newdb");
+ public static final TableIdentifier TABLE = TableIdentifier.of(NS, "table");
+ public static final Schema SCHEMA =
+ new Schema(
+ required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
+ required(4, "data", Types.StringType.get()));
+ public static final String TEST_ACCESS_KEY = "test_access_key";
+ public static final String SECRET_ACCESS_KEY = "secret_access_key";
+ public static final String SESSION_TOKEN = "session_token";
+
+ private RealmId realmId;
+ private StsClient stsClient;
+ private TestServices testServices;
+
+ @BeforeEach
+ public void before(TestInfo testInfo) {
+ String realmName =
+ "realm_%s_%s"
+ .formatted(
+ testInfo.getTestMethod().map(Method::getName).orElse("test"),
System.nanoTime());
+ realmId = RealmId.newRealmId(realmName);
+
+ // Mock get subscoped creds
+ stsClient = Mockito.mock(StsClient.class);
+ when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
+ .thenReturn(
+ AssumeRoleResponse.builder()
+ .credentials(
+ Credentials.builder()
+ .accessKeyId(TEST_ACCESS_KEY)
+ .secretAccessKey(SECRET_ACCESS_KEY)
+ .sessionToken(SESSION_TOKEN)
+ .build())
+ .build());
+
+ // Spy FileIOFactory and check if the credentials are passed to the FileIO
+ TestServices.FileIOFactorySupplier fileIOFactorySupplier =
+ (entityManagerFactory, metaStoreManagerFactory, configurationStore) ->
+ Mockito.spy(
+ new DefaultFileIOFactory(
+ entityManagerFactory, metaStoreManagerFactory,
configurationStore) {
+ @Override
+ FileIO loadFileIOInternal(
+ @Nonnull String ioImplClassName, @Nonnull Map<String,
String> properties) {
+ // properties should contain credentials
+ Assertions.assertThat(properties)
+ .containsEntry(S3FileIOProperties.ACCESS_KEY_ID,
TEST_ACCESS_KEY)
+ .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY,
SECRET_ACCESS_KEY)
+ .containsEntry(S3FileIOProperties.SESSION_TOKEN,
SESSION_TOKEN);
+ return super.loadFileIOInternal(ioImplClassName,
properties);
+ }
+ });
+
+ testServices =
+ TestServices.builder()
+ .config(Map.of("ALLOW_SPECIFYING_FILE_IO_IMPL", true))
+ .realmId(realmId)
+ .stsClient(stsClient)
+ .fileIOFactorySupplier(fileIOFactorySupplier)
+ .build();
+ }
+
+ @AfterEach
+ public void after() {}
+
+ @Test
+ public void testLoadFileIOForTableLike() {
+ BasePolarisCatalog catalog = createCatalog(testServices);
+ catalog.createNamespace(NS);
+ catalog.createTable(TABLE, SCHEMA);
+
+ // 1. BasePolarisCatalog:doCommit: for writing the table during the
creation
+ Mockito.verify(testServices.fileIOFactory(), Mockito.times(1))
+ .loadFileIO(
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any());
+ }
+
+ @Test
+ public void testLoadFileIOForCleanupTask() {
+ BasePolarisCatalog catalog = createCatalog(testServices);
+ catalog.createNamespace(NS);
+ catalog.createTable(TABLE, SCHEMA);
+ catalog.dropTable(TABLE, true);
+
+ List<PolarisBaseEntity> tasks =
+ testServices
+ .metaStoreManagerFactory()
+ .getOrCreateMetaStoreManager(realmId)
+ .loadTasks(
+
testServices.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+ "testExecutor",
+ 1)
+ .getEntities();
+ Assertions.assertThat(tasks).hasSize(1);
+ TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
+ FileIO fileIO = new
TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmId);
+
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
+
+ // 1. BasePolarisCatalog:doCommit: for writing the table during the
creation
+ // 2. BasePolarisCatalog:doRefresh: for reading the table during the drop
+ // 3. TaskFileIOSupplier:apply: for clean up metadata files and merge files
+ Mockito.verify(testServices.fileIOFactory(), Mockito.times(3))
+ .loadFileIO(
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any());
+ }
+
+ BasePolarisCatalog createCatalog(TestServices services) {
+ String storageLocation = "s3://my-bucket/path/to/data";
+ AwsStorageConfigInfo awsStorageConfigInfo =
+ AwsStorageConfigInfo.builder()
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setAllowedLocations(List.of(storageLocation))
+ .setRoleArn("arn:aws:iam::012345678901:role/jdoe")
+ .build();
+
+ // Create Catalog Entity
+ Catalog catalog =
+ PolarisCatalog.builder()
+ .setType(Catalog.TypeEnum.INTERNAL)
+ .setName(CATALOG_NAME)
+ .setProperties(new CatalogProperties("s3://tmp/path/to/data"))
+ .setStorageConfigInfo(awsStorageConfigInfo)
+ .build();
+ services
+ .catalogsApi()
+ .createCatalog(
+ new CreateCatalogRequest(catalog), services.realmId(),
services.securityContext());
+
+ PolarisPassthroughResolutionView passthroughView =
+ new PolarisPassthroughResolutionView(
+ services.entityManagerFactory().getOrCreateEntityManager(realmId),
+
services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+ services.securityContext(),
+ CATALOG_NAME);
+ BasePolarisCatalog polarisCatalog =
+ new BasePolarisCatalog(
+ services.realmId(),
+ services.entityManagerFactory().getOrCreateEntityManager(realmId),
+
services.metaStoreManagerFactory().getOrCreateMetaStoreManager(realmId),
+
services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+ services.configurationStore(),
+ services.polarisDiagnostics(),
+ passthroughView,
+ services.securityContext(),
+ services.taskExecutor(),
+ services.fileIOFactory());
+ polarisCatalog.initialize(
+ CATALOG_NAME,
+ ImmutableMap.of(
+ org.apache.iceberg.CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.inmemory.InMemoryFileIO"));
+ return polarisCatalog;
+ }
+}
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
new file mode 100644
index 00000000..c09efede
--- /dev/null
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.GoogleCredentials;
+import jakarta.ws.rs.core.SecurityContext;
+import java.security.Principal;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.service.admin.PolarisServiceImpl;
+import org.apache.polaris.service.admin.api.PolarisCatalogsApi;
+import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
+import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
+import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
+import org.apache.polaris.service.config.DefaultConfigurationStore;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
+import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.apache.polaris.service.task.TaskExecutor;
+import org.assertj.core.util.TriFunction;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+
+public record TestServices(
+ PolarisCatalogsApi catalogsApi,
+ IcebergRestCatalogApi restApi,
+ PolarisConfigurationStore configurationStore,
+ PolarisDiagnostics polarisDiagnostics,
+ RealmEntityManagerFactory entityManagerFactory,
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ RealmId realmId,
+ SecurityContext securityContext,
+ FileIOFactory fileIOFactory,
+ TaskExecutor taskExecutor) {
+
+ private static final RealmId TEST_REALM =
+ org.apache.polaris.core.context.RealmId.newRealmId("test-realm");
+ private static final String GCP_ACCESS_TOKEN = "abc";
+
+ @FunctionalInterface
+ public interface FileIOFactorySupplier
+ extends TriFunction<
+ RealmEntityManagerFactory,
+ MetaStoreManagerFactory,
+ PolarisConfigurationStore,
+ FileIOFactory> {}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private RealmId realmId = TEST_REALM;
+ private Map<String, Object> config = Map.of();
+ private StsClient stsClient = Mockito.mock(StsClient.class);
+ private FileIOFactorySupplier fileIOFactorySupplier =
MeasuredFileIOFactory::new;
+
+ private Builder() {}
+
+ public Builder realmId(RealmId realmId) {
+ this.realmId = realmId;
+ return this;
+ }
+
+ public Builder config(Map<String, Object> config) {
+ this.config = config;
+ return this;
+ }
+
+ public Builder fileIOFactorySupplier(FileIOFactorySupplier
fileIOFactorySupplier) {
+ this.fileIOFactorySupplier = fileIOFactorySupplier;
+ return this;
+ }
+
+ public Builder stsClient(StsClient stsClient) {
+ this.stsClient = stsClient;
+ return this;
+ }
+
+ public TestServices build() {
+ DefaultConfigurationStore configurationStore = new
DefaultConfigurationStore(config);
+ PolarisDiagnostics polarisDiagnostics =
Mockito.mock(PolarisDiagnostics.class);
+ PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class);
+
+ // Application level
+ PolarisStorageIntegrationProviderImpl storageIntegrationProvider =
+ new PolarisStorageIntegrationProviderImpl(
+ () -> stsClient,
+ () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN,
new Date())),
+ configurationStore);
+ InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory =
+ new InMemoryPolarisMetaStoreManagerFactory(
+ storageIntegrationProvider,
+ configurationStore,
+ polarisDiagnostics,
+ Clock.systemDefaultZone());
+ RealmEntityManagerFactory realmEntityManagerFactory =
+ new RealmEntityManagerFactory(metaStoreManagerFactory,
polarisDiagnostics) {};
+
+ PolarisEntityManager entityManager =
+ realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+ PolarisMetaStoreManager metaStoreManager =
+ metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+ PolarisMetaStoreSession metaStoreSession =
+ metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+ FileIOFactory fileIOFactory =
+ fileIOFactorySupplier.apply(
+ realmEntityManagerFactory, metaStoreManagerFactory,
configurationStore);
+
+ TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
+ IcebergRestCatalogApiService service =
+ new IcebergCatalogAdapter(
+ realmId,
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ polarisDiagnostics,
+ authorizer,
+ taskExecutor,
+ fileIOFactory);
+
+ IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
+
+ PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal =
+ metaStoreManager.createPrincipal(
+ metaStoreSession,
+ new PrincipalEntity.Builder()
+ .setName("test-principal")
+ .setCreateTimestamp(Instant.now().toEpochMilli())
+ .setCredentialRotationRequiredState()
+ .build());
+ AuthenticatedPolarisPrincipal principal =
+ new AuthenticatedPolarisPrincipal(
+ PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of());
+
+ SecurityContext securityContext =
+ new SecurityContext() {
+ @Override
+ public Principal getUserPrincipal() {
+ return principal;
+ }
+
+ @Override
+ public boolean isUserInRole(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean isSecure() {
+ return true;
+ }
+
+ @Override
+ public String getAuthenticationScheme() {
+ return "";
+ }
+ };
+
+ PolarisCatalogsApi catalogsApi =
+ new PolarisCatalogsApi(
+ new PolarisServiceImpl(
+ entityManager,
+ metaStoreManager,
+ metaStoreSession,
+ configurationStore,
+ authorizer,
+ polarisDiagnostics));
+
+ return new TestServices(
+ catalogsApi,
+ restApi,
+ configurationStore,
+ polarisDiagnostics,
+ realmEntityManagerFactory,
+ metaStoreManagerFactory,
+ realmId,
+ securityContext,
+ fileIOFactory,
+ taskExecutor);
+ }
+ }
+}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
similarity index 99%
rename from
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
rename to
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
index db74ad3b..54197d7e 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.quarkus.catalog;
+package org.apache.polaris.service.catalog;
import jakarta.ws.rs.core.SecurityContext;
import java.util.Arrays;
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
similarity index 93%
rename from
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
rename to
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
index 8acf2efd..0313f86f 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
import java.util.Map;
import java.util.Optional;
@@ -32,7 +32,7 @@ import org.apache.iceberg.io.OutputFile;
* File IO wrapper used for tests. It measures the number of bytes read, files
written, and files
* deleted. It can inject exceptions during InputFile and OutputFile creation.
*/
-public class TestFileIO implements FileIO {
+public class MeasuredFileIO implements FileIO {
private final FileIO io;
// When present, the following will be used to throw exceptions at various
parts of the IO
@@ -44,7 +44,7 @@ public class TestFileIO implements FileIO {
private int numOutputFiles;
private int numDeletedFiles;
- public TestFileIO(
+ public MeasuredFileIO(
FileIO io,
Optional<Supplier<RuntimeException>> newInputFileExceptionSupplier,
Optional<Supplier<RuntimeException>> newOutputFileExceptionSupplier,
@@ -73,9 +73,9 @@ public class TestFileIO implements FileIO {
throw s.get();
});
- // Use the inner's length in case the TestInputFile throws a getLength
exception
+ // Use the inner's length in case the MeasuredInputFile throws a getLength
exception
inputBytes += inner.getLength();
- return new TestInputFile(inner, getLengthExceptionSupplier);
+ return new MeasuredInputFile(inner, getLengthExceptionSupplier);
}
@Override
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
similarity index 55%
rename from
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
rename to
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
index 75047559..5826c1da 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
@@ -16,26 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
+import jakarta.annotation.Nonnull;
import jakarta.enterprise.inject.Vetoed;
+import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Supplier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
-import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
/**
* A FileIOFactory that measures the number of bytes read, files written, and
files deleted. It can
* inject exceptions at various parts of the IO construction.
*/
@Vetoed
-public class TestFileIOFactory extends DefaultFileIOFactory {
- private final List<TestFileIO> ios = new ArrayList<>();
+public class MeasuredFileIOFactory implements FileIOFactory {
+ private final List<MeasuredFileIO> ios = new ArrayList<>();
// When present, the following will be used to throw exceptions at various
parts of the IO
public Optional<Supplier<RuntimeException>> loadFileIOExceptionSupplier =
Optional.empty();
@@ -43,18 +50,42 @@ public class TestFileIOFactory extends DefaultFileIOFactory
{
public Optional<Supplier<RuntimeException>> newOutputFileExceptionSupplier =
Optional.empty();
public Optional<Supplier<RuntimeException>> getLengthExceptionSupplier =
Optional.empty();
- public TestFileIOFactory() {}
+ private final FileIOFactory defaultFileIOFactory;
+
+ @Inject
+ public MeasuredFileIOFactory(
+ RealmEntityManagerFactory realmEntityManagerFactory,
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore configurationStore) {
+ defaultFileIOFactory =
+ new DefaultFileIOFactory(
+ realmEntityManagerFactory, metaStoreManagerFactory,
configurationStore);
+ }
@Override
- public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
+ public FileIO loadFileIO(
+ @Nonnull RealmId realmId,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
loadFileIOExceptionSupplier.ifPresent(
s -> {
throw s.get();
});
- TestFileIO wrapped =
- new TestFileIO(
- CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()),
+ MeasuredFileIO wrapped =
+ new MeasuredFileIO(
+ defaultFileIOFactory.loadFileIO(
+ realmId,
+ ioImplClassName,
+ properties,
+ identifier,
+ tableLocations,
+ storageActions,
+ resolvedEntityPath),
newInputFileExceptionSupplier,
newOutputFileExceptionSupplier,
getLengthExceptionSupplier);
@@ -64,7 +95,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
public long getInputBytes() {
long sum = 0;
- for (TestFileIO io : ios) {
+ for (MeasuredFileIO io : ios) {
sum += io.getInputBytes();
}
return sum;
@@ -72,7 +103,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
public long getNumOutputFiles() {
long sum = 0;
- for (TestFileIO io : ios) {
+ for (MeasuredFileIO io : ios) {
sum += io.getNumOuptutFiles();
}
return sum;
@@ -80,7 +111,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
public long getNumDeletedFiles() {
long sum = 0;
- for (TestFileIO io : ios) {
+ for (MeasuredFileIO io : ios) {
sum += io.getNumDeletedFiles();
}
return sum;
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
similarity index 93%
rename from
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
rename to
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
index da050c11..a4b2f236 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
import java.util.Optional;
import java.util.function.Supplier;
@@ -24,11 +24,11 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
/** An InputFile wrapper that can be forced to throw exceptions. */
-public class TestInputFile implements InputFile {
+public class MeasuredInputFile implements InputFile {
private final InputFile inputFile;
private final Optional<Supplier<RuntimeException>>
getLengthExceptionSupplier;
- public TestInputFile(
+ public MeasuredInputFile(
InputFile inputFile, Optional<Supplier<RuntimeException>>
getLengthExceptionSupplier) {
this.inputFile = inputFile;
this.getLengthExceptionSupplier = getLengthExceptionSupplier;