This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 23385a00 Extend FileIOFactory to Improve Customization (#724)
23385a00 is described below

commit 23385a00d89bc43e6b869eb0ea65ecb97b6c9bb7
Author: Rulin Xing <[email protected]>
AuthorDate: Wed Jan 29 13:35:50 2025 -0800

    Extend FileIOFactory to Improve Customization (#724)
    
    * Initial Commit
    
    * Fix test errors
    
    * Resolved some comments
    
    * Resolved some comments
    
    * Removed unused package
    
    * Make FileIOFactory as an ApplicationScoped bean
    
    * Revert some changes
    
    * Use FileIOFactory to get the creds for cleanup task
    
    * Use new interface in tests
    
    * Move some util classes to polaris-service-common as test fixtures
    
    * Delete the interface in FileIOFactory that is only used for testing
    
    * Refactor TestServices and use it in FileIOFactoryTest
    
    * Small change
    
    * Rename TestFileIOFactory to MeasuredFileIOFactory
    
    * Use FileIOFactory to load catalog default FileIO in tests, fix log level 
for FileIOUtil
    
    ---------
    
    Co-authored-by: Rulin Xing <[email protected]>
---
 quarkus/service/build.gradle.kts                   |   1 +
 .../polaris/service/quarkus/TestServices.java      | 163 ---------------
 .../quarkus/admin/ManagementServiceTest.java       |   9 +-
 .../quarkus/admin/PolarisAuthzTestBase.java        |   8 +-
 .../admin/PolarisOverlappingCatalogTest.java       |   6 +-
 .../quarkus/admin/PolarisOverlappingTableTest.java |   4 +-
 .../quarkus/catalog/BasePolarisCatalogTest.java    |  80 +++----
 .../catalog/BasePolarisCatalogViewTest.java        |   6 +-
 .../PolarisCatalogHandlerWrapperAuthzTest.java     |   3 +-
 .../quarkus/catalog/io/FileIOExceptionsTest.java   |   9 +-
 service/common/build.gradle.kts                    |  26 +++
 .../service/catalog/BasePolarisCatalog.java        | 135 ++++--------
 .../service/catalog/io/DefaultFileIOFactory.java   |  91 +++++++-
 .../polaris/service/catalog/io/FileIOFactory.java  |  38 +++-
 .../polaris/service/catalog/io/FileIOUtil.java     | 131 ++++++++++++
 .../catalog/io/WasbTranslatingFileIOFactory.java   |  43 +++-
 .../polaris/service/task/TaskFileIOSupplier.java   |  55 ++---
 .../service/catalog/io/FileIOFactoryTest.java      | 229 +++++++++++++++++++++
 .../org/apache/polaris/service/TestServices.java   | 217 +++++++++++++++++++
 .../catalog/PolarisPassthroughResolutionView.java  |   2 +-
 .../polaris/service/catalog/io/MeasuredFileIO.java |  10 +-
 .../service/catalog/io/MeasuredFileIOFactory.java  |  59 ++++--
 .../service/catalog/io/MeasuredInputFile.java      |   6 +-
 23 files changed, 947 insertions(+), 384 deletions(-)

diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts
index 72646486..7ab5ec87 100644
--- a/quarkus/service/build.gradle.kts
+++ b/quarkus/service/build.gradle.kts
@@ -90,6 +90,7 @@ dependencies {
   testFixturesApi(project(":polaris-tests"))
 
   testImplementation(project(":polaris-api-management-model"))
+  testImplementation(testFixtures(project(":polaris-service-common")))
 
   
testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests")
   
testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests")
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
deleted file mode 100644
index 81d34dd5..00000000
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.quarkus;
-
-import com.google.auth.oauth2.AccessToken;
-import com.google.auth.oauth2.GoogleCredentials;
-import jakarta.ws.rs.core.SecurityContext;
-import java.security.Principal;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import org.apache.polaris.core.PolarisDiagnostics;
-import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
-import org.apache.polaris.core.auth.PolarisAuthorizer;
-import org.apache.polaris.core.context.RealmId;
-import org.apache.polaris.core.entity.PolarisEntity;
-import org.apache.polaris.core.entity.PrincipalEntity;
-import org.apache.polaris.core.persistence.PolarisEntityManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
-import org.apache.polaris.service.admin.PolarisServiceImpl;
-import org.apache.polaris.service.admin.api.PolarisCatalogsApi;
-import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
-import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
-import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
-import org.apache.polaris.service.catalog.io.FileIOFactory;
-import org.apache.polaris.service.config.DefaultConfigurationStore;
-import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
-import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
-import org.apache.polaris.service.task.TaskExecutor;
-import org.mockito.Mockito;
-
-public record TestServices(
-    IcebergRestCatalogApi restApi,
-    PolarisCatalogsApi catalogsApi,
-    RealmId realmId,
-    SecurityContext securityContext) {
-
-  private static final RealmId testRealm = RealmId.newRealmId("test-realm");
-
-  public static TestServices inMemory(Map<String, Object> config) {
-    return inMemory(new TestFileIOFactory(), config);
-  }
-
-  public static TestServices inMemory(FileIOFactory ioFactory) {
-    return inMemory(ioFactory, Map.of());
-  }
-
-  public static TestServices inMemory(FileIOFactory ioFactory, Map<String, 
Object> config) {
-
-    DefaultConfigurationStore configurationStore = new 
DefaultConfigurationStore(config);
-    PolarisDiagnostics polarisDiagnostics = 
Mockito.mock(PolarisDiagnostics.class);
-
-    PolarisStorageIntegrationProviderImpl storageIntegrationProvider =
-        new PolarisStorageIntegrationProviderImpl(
-            Mockito::mock,
-            () -> GoogleCredentials.create(new AccessToken("abc", new Date())),
-            configurationStore);
-
-    InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory =
-        new InMemoryPolarisMetaStoreManagerFactory(
-            storageIntegrationProvider,
-            configurationStore,
-            polarisDiagnostics,
-            Clock.systemDefaultZone());
-
-    PolarisMetaStoreManager metaStoreManager =
-        metaStoreManagerFactory.getOrCreateMetaStoreManager(testRealm);
-
-    PolarisMetaStoreSession session =
-        metaStoreManagerFactory.getOrCreateSessionSupplier(testRealm).get();
-
-    RealmEntityManagerFactory realmEntityManagerFactory =
-        new RealmEntityManagerFactory(metaStoreManagerFactory, 
polarisDiagnostics) {};
-
-    PolarisEntityManager entityManager =
-        realmEntityManagerFactory.getOrCreateEntityManager(testRealm);
-
-    PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class);
-
-    IcebergRestCatalogApiService service =
-        new IcebergCatalogAdapter(
-            testRealm,
-            entityManager,
-            metaStoreManager,
-            session,
-            configurationStore,
-            polarisDiagnostics,
-            authorizer,
-            Mockito.mock(TaskExecutor.class),
-            ioFactory);
-
-    IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
-
-    PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal =
-        metaStoreManager.createPrincipal(
-            session,
-            new PrincipalEntity.Builder()
-                .setName("test-principal")
-                .setCreateTimestamp(Instant.now().toEpochMilli())
-                .setCredentialRotationRequiredState()
-                .build());
-
-    AuthenticatedPolarisPrincipal principal =
-        new AuthenticatedPolarisPrincipal(
-            PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of());
-
-    SecurityContext securityContext =
-        new SecurityContext() {
-          @Override
-          public Principal getUserPrincipal() {
-            return principal;
-          }
-
-          @Override
-          public boolean isUserInRole(String s) {
-            return false;
-          }
-
-          @Override
-          public boolean isSecure() {
-            return true;
-          }
-
-          @Override
-          public String getAuthenticationScheme() {
-            return "";
-          }
-        };
-
-    PolarisCatalogsApi catalogsApi =
-        new PolarisCatalogsApi(
-            new PolarisServiceImpl(
-                entityManager,
-                metaStoreManager,
-                session,
-                configurationStore,
-                authorizer,
-                polarisDiagnostics));
-
-    return new TestServices(restApi, catalogsApi, testRealm, securityContext);
-  }
-}
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
index 4af7b233..c4e10852 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java
@@ -32,15 +32,14 @@ import 
org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.PolarisCatalog;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.core.admin.model.UpdateCatalogRequest;
-import org.apache.polaris.service.quarkus.TestServices;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
+import org.apache.polaris.service.TestServices;
 import org.junit.jupiter.api.Test;
 
 public class ManagementServiceTest {
   static TestServices services =
-      TestServices.inMemory(
-          new TestFileIOFactory(),
-          Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3", "GCS", 
"AZURE")));
+      TestServices.builder()
+          .config(Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3", 
"GCS", "AZURE")))
+          .build();
 
   @Test
   public void testCreateCatalogWithDisallowedStorageConfig() {
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 485f4634..629951df 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -67,10 +67,11 @@ import 
org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import 
org.apache.polaris.service.quarkus.catalog.PolarisPassthroughResolutionView;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
@@ -160,6 +161,7 @@ public abstract class PolarisAuthzTestBase {
   protected PolarisEntityManager entityManager;
   protected PolarisMetaStoreManager metaStoreManager;
   protected PolarisMetaStoreSession metaStoreSession;
+  protected FileIOFactory fileIOFactory;
   protected PolarisBaseEntity catalogEntity;
   protected PrincipalEntity principalEntity;
   protected RealmId realmId;
@@ -385,6 +387,8 @@ public abstract class PolarisAuthzTestBase {
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
             entityManager, metaStoreSession, securityContext, CATALOG_NAME);
+    this.fileIOFactory =
+        new DefaultFileIOFactory(realmEntityManagerFactory, managerFactory, 
configurationStore);
     this.baseCatalog =
         new BasePolarisCatalog(
             realmId,
@@ -396,7 +400,7 @@ public abstract class PolarisAuthzTestBase {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            new DefaultFileIOFactory());
+            fileIOFactory);
     this.baseCatalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
index 61a3fdc5..9e60049c 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
@@ -33,16 +33,14 @@ import org.apache.polaris.core.admin.model.Catalog;
 import org.apache.polaris.core.admin.model.CatalogProperties;
 import org.apache.polaris.core.admin.model.CreateCatalogRequest;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
+import org.apache.polaris.service.TestServices;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
 public class PolarisOverlappingCatalogTest {
 
   static TestServices services =
-      TestServices.inMemory(
-          new TestFileIOFactory(), Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", 
"false"));
+      TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", 
"false")).build();
 
   private Response createCatalog(String prefix, String defaultBaseLocation, 
boolean isExternal) {
     return createCatalog(prefix, defaultBaseLocation, isExternal, new 
ArrayList<String>());
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
index 2791bfa7..03a5b723 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
@@ -36,7 +36,7 @@ import org.apache.polaris.core.admin.model.CatalogProperties;
 import org.apache.polaris.core.admin.model.CreateCatalogRequest;
 import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
+import org.apache.polaris.service.TestServices;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -105,7 +105,7 @@ public class PolarisOverlappingTableTest {
       Map<String, Object> serverConfig,
       Map<String, String> catalogConfig,
       int expectedStatusForOverlaps) {
-    TestServices services = TestServices.inMemory(serverConfig);
+    TestServices services = 
TestServices.builder().config(serverConfig).build();
 
     CatalogProperties.Builder propertiesBuilder =
         CatalogProperties.builder()
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
index 38580b9a..cab70907 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
@@ -23,7 +23,6 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -96,11 +95,12 @@ import 
org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.exception.IcebergExceptionMapper;
-import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TableCleanupTaskHandler;
 import org.apache.polaris.service.task.TaskExecutor;
@@ -163,6 +163,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
   private PolarisMetaStoreSession metaStoreSession;
   private PolarisAdminService adminService;
   private PolarisEntityManager entityManager;
+  private FileIOFactory fileIOFactory;
   private AuthenticatedPolarisPrincipal authenticatedRoot;
   private PolarisEntity catalogEntity;
   private SecurityContext securityContext;
@@ -241,22 +242,9 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
         new PolarisPassthroughResolutionView(
             entityManager, metaStoreSession, securityContext, CATALOG_NAME);
     TaskExecutor taskExecutor = Mockito.mock();
-    this.catalog =
-        new BasePolarisCatalog(
-            realmId,
-            entityManager,
-            metaStoreManager,
-            metaStoreSession,
-            configurationStore,
-            diagServices,
-            passthroughView,
-            securityContext,
-            taskExecutor,
-            new DefaultFileIOFactory());
-    this.catalog.initialize(
-        CATALOG_NAME,
-        ImmutableMap.of(
-            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+    this.fileIOFactory =
+        new DefaultFileIOFactory(entityManagerFactory, managerFactory, 
configurationStore);
+
     StsClient stsClient = Mockito.mock(StsClient.class);
     when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
         .thenReturn(
@@ -273,6 +261,23 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     when(storageIntegrationProvider.getStorageIntegrationForConfig(
             isA(AwsStorageConfigurationInfo.class)))
         .thenReturn((PolarisStorageIntegration) storageIntegration);
+
+    this.catalog =
+        new BasePolarisCatalog(
+            realmId,
+            entityManager,
+            metaStoreManager,
+            metaStoreSession,
+            configurationStore,
+            diagServices,
+            passthroughView,
+            securityContext,
+            taskExecutor,
+            fileIOFactory);
+    this.catalog.initialize(
+        CATALOG_NAME,
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
   }
 
   @AfterEach
@@ -509,7 +514,8 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     // filename.
     final String tableLocation = 
"s3://externally-owned-bucket/validate_table/";
     final String tableMetadataLocation = tableLocation + "metadata/";
-    FileIOFactory fileIoFactory = spy(new DefaultFileIOFactory());
+    FileIOFactory fileIoFactory =
+        spy(new DefaultFileIOFactory(entityManagerFactory, managerFactory, 
configurationStore));
     BasePolarisCatalog catalog =
         new BasePolarisCatalog(
             realmId,
@@ -541,7 +547,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
 
     doThrow(new ForbiddenException("Fake failure applying downscoped 
credentials"))
         .when(fileIoFactory)
-        .loadFileIO(any(), any());
+        .loadFileIO(any(), any(), any(), any(), any(), any(), any());
     Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, 
request))
         .isInstanceOf(ForbiddenException.class)
         .hasMessageContaining("Fake failure applying downscoped credentials");
@@ -849,7 +855,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             passthroughView,
             securityContext,
             taskExecutor,
-            new DefaultFileIOFactory());
+            fileIOFactory);
     catalog.initialize(
         catalogWithoutStorage,
         ImmutableMap.of(
@@ -914,7 +920,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             passthroughView,
             securityContext,
             taskExecutor,
-            new DefaultFileIOFactory());
+            fileIOFactory);
     catalog.initialize(
         catalogName,
         ImmutableMap.of(
@@ -1406,10 +1412,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
         .containsEntry(PolarisCredentialProperty.AWS_KEY_ID, TEST_ACCESS_KEY)
         .containsEntry(PolarisCredentialProperty.AWS_SECRET_KEY, 
SECRET_ACCESS_KEY)
         .containsEntry(PolarisCredentialProperty.AWS_TOKEN, SESSION_TOKEN);
-    FileIO fileIO =
-        new TaskFileIOSupplier(
-                createMockMetaStoreManagerFactory(), new 
DefaultFileIOFactory(), configurationStore)
-            .apply(taskEntity, realmId);
+    FileIO fileIO = new TaskFileIOSupplier(fileIOFactory).apply(taskEntity, 
realmId);
     
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
   }
 
@@ -1450,7 +1453,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            new DefaultFileIOFactory());
+            fileIOFactory);
     noPurgeCatalog.initialize(
         noPurgeCatalogName,
         ImmutableMap.of(
@@ -1522,7 +1525,8 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
         new PolarisPassthroughResolutionView(
             entityManager, metaStoreSession, securityContext, CATALOG_NAME);
 
-    TestFileIOFactory measured = new TestFileIOFactory();
+    MeasuredFileIOFactory measured =
+        new MeasuredFileIOFactory(entityManagerFactory, managerFactory, 
configurationStore);
     BasePolarisCatalog catalog =
         new BasePolarisCatalog(
             realmId,
@@ -1558,22 +1562,24 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
         .isGreaterThan(0);
 
     Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should 
succeed").isTrue();
+    TaskEntity taskEntity =
+        TaskEntity.of(
+            metaStoreManager
+                .loadTasks(metaStoreSession, "testExecutor", 1)
+                .getEntities()
+                .getFirst());
+    Map<String, String> properties = taskEntity.getInternalPropertiesAsMap();
+    properties.put(CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO");
+    taskEntity.setInternalPropertiesAsMap(properties);
     TableCleanupTaskHandler handler =
         new TableCleanupTaskHandler(
             Mockito.mock(),
             createMockMetaStoreManagerFactory(),
             configurationStore,
             diagServices,
-            (task, rc) ->
-                
measured.loadFileIO("org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()),
+            new TaskFileIOSupplier(measured),
             clock);
-    handler.handleTask(
-        TaskEntity.of(
-            metaStoreManager
-                .loadTasks(metaStoreSession, "testExecutor", 1)
-                .getEntities()
-                .getFirst()),
-        realmId);
+    handler.handleTask(taskEntity, realmId);
     Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was 
deleted").isGreaterThan(0);
   }
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
index b6bbefef..f5cdbcaf 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java
@@ -56,7 +56,9 @@ import 
org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.junit.jupiter.api.AfterEach;
@@ -171,6 +173,8 @@ public class BasePolarisCatalogViewTest extends 
ViewCatalogTests<BasePolarisCata
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
             entityManager, metaStoreSession, securityContext, CATALOG_NAME);
+    FileIOFactory fileIOFactory =
+        new DefaultFileIOFactory(entityManagerFactory, managerFactory, 
configurationStore);
     this.catalog =
         new BasePolarisCatalog(
             realmId,
@@ -182,7 +186,7 @@ public class BasePolarisCatalogViewTest extends 
ViewCatalogTests<BasePolarisCata
             passthroughView,
             securityContext,
             Mockito.mock(),
-            new DefaultFileIOFactory());
+            fileIOFactory);
     this.catalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
index 5f6b0dec..ef3b7fa5 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
@@ -58,7 +58,6 @@ import org.apache.polaris.core.entity.PolarisPrivilege;
 import org.apache.polaris.core.entity.PrincipalEntity;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.service.catalog.PolarisCatalogHandlerWrapper;
-import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.apache.polaris.service.types.NotificationType;
@@ -117,7 +116,7 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends 
PolarisAuthzTestBase
         catalogName,
         polarisAuthorizer,
         Mockito.mock(),
-        new DefaultFileIOFactory());
+        fileIOFactory);
   }
 
   /**
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
index 13349c82..e78d9a68 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java
@@ -38,7 +38,8 @@ import 
org.apache.polaris.core.admin.model.CreateCatalogRequest;
 import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.PolarisCatalog;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
-import org.apache.polaris.service.quarkus.TestServices;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -53,13 +54,13 @@ public class FileIOExceptionsTest {
   private static final String catalog = "test-catalog";
   private static final String catalogBaseLocation = 
"file:/tmp/buckets/my-bucket/path/to/data";
 
-  private static TestFileIOFactory ioFactory;
   private static TestServices services;
+  private static MeasuredFileIOFactory ioFactory;
 
   @BeforeAll
   public static void beforeAll() {
-    ioFactory = new TestFileIOFactory();
-    services = TestServices.inMemory(ioFactory);
+    services = TestServices.builder().build();
+    ioFactory = (MeasuredFileIOFactory) services.fileIOFactory();
 
     FileStorageConfigInfo storageConfigInfo =
         FileStorageConfigInfo.builder()
diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts
index 79d2809c..58c99286 100644
--- a/service/common/build.gradle.kts
+++ b/service/common/build.gradle.kts
@@ -19,6 +19,7 @@
 
 plugins {
   id("polaris-server")
+  id("java-test-fixtures")
   alias(libs.plugins.jandex)
 }
 
@@ -93,6 +94,31 @@ dependencies {
   testImplementation(libs.assertj.core)
   testImplementation(libs.mockito.core)
   testRuntimeOnly("org.junit.platform:junit-platform-launcher")
+
+  testFixturesImplementation(project(":polaris-core"))
+  testFixturesImplementation(project(":polaris-api-management-model"))
+  testFixturesImplementation(project(":polaris-api-management-service"))
+  testFixturesImplementation(project(":polaris-api-iceberg-service"))
+
+  testFixturesImplementation(libs.jakarta.enterprise.cdi.api)
+  testFixturesImplementation(libs.jakarta.annotation.api)
+  testFixturesImplementation(libs.jakarta.ws.rs.api)
+
+  testFixturesImplementation(platform(libs.quarkus.bom))
+  testFixturesImplementation("io.quarkus:quarkus-rest-client")
+  testFixturesImplementation("io.quarkus:quarkus-rest-client-jackson")
+
+  testFixturesImplementation(platform(libs.iceberg.bom))
+  testFixturesImplementation("org.apache.iceberg:iceberg-api")
+  testFixturesImplementation("org.apache.iceberg:iceberg-core")
+  testFixturesImplementation("org.apache.iceberg:iceberg-aws")
+
+  testFixturesImplementation(platform(libs.google.cloud.storage.bom))
+  testFixturesImplementation("com.google.cloud:google-cloud-storage")
+  testFixturesImplementation(platform(libs.awssdk.bom))
+  testFixturesImplementation("software.amazon.awssdk:sts")
+  testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
+  testFixturesImplementation("software.amazon.awssdk:s3")
 }
 
 tasks.named("javadoc") { dependsOn("jandex") }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 06bbc84d..91c18b48 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -93,6 +93,7 @@ import 
org.apache.polaris.core.persistence.PolarisEntityManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
 import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
@@ -104,6 +105,7 @@ import 
org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
 import org.apache.polaris.service.exception.IcebergExceptionMapper;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.NotificationRequest;
@@ -324,7 +326,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
           String.format("Failed to fetch resolved parent for TableIdentifier 
'%s'", identifier));
     }
     FileIO fileIO =
-        refreshIOWithCredentials(
+        loadFileIOForTableLike(
             identifier,
             Set.of(locationDir),
             resolvedParent,
@@ -818,10 +820,15 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
           .log("Table entity has no storage configuration in its hierarchy");
       return Map.of();
     }
-    return refreshCredentials(
+    return FileIOUtil.refreshCredentials(
+        realmId,
+        entityManager,
+        getCredentialVendor(),
+        metaStoreSession,
+        configurationStore,
         tableIdentifier,
-        storageActions,
         getLocationsAllowedToBeAccessed(tableMetadata),
+        storageActions,
         storageInfo.get());
   }
 
@@ -857,62 +864,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
             ? resolvedEntityView.getResolvedPath(tableIdentifier.namespace())
             : resolvedTableEntities;
 
-    return findStorageInfoFromHierarchy(resolvedStorageEntity);
-  }
-
-  private Map<String, String> refreshCredentials(
-      TableIdentifier tableIdentifier,
-      Set<PolarisStorageActions> storageActions,
-      String tableLocation,
-      PolarisEntity entity) {
-    return refreshCredentials(tableIdentifier, storageActions, 
Set.of(tableLocation), entity);
-  }
-
-  private Map<String, String> refreshCredentials(
-      TableIdentifier tableIdentifier,
-      Set<PolarisStorageActions> storageActions,
-      Set<String> tableLocations,
-      PolarisEntity entity) {
-    Boolean skipCredentialSubscopingIndirection =
-        getBooleanContextConfiguration(
-            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
-            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
-    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
-      LOGGER
-          .atInfo()
-          .addKeyValue("tableIdentifier", tableIdentifier)
-          .log("Skipping generation of subscoped creds for table");
-      return Map.of();
-    }
-
-    boolean allowList =
-        storageActions.contains(PolarisStorageActions.LIST)
-            || storageActions.contains(PolarisStorageActions.ALL);
-    Set<String> writeLocations =
-        storageActions.contains(PolarisStorageActions.WRITE)
-                || storageActions.contains(PolarisStorageActions.DELETE)
-                || storageActions.contains(PolarisStorageActions.ALL)
-            ? tableLocations
-            : Set.of();
-    Map<String, String> credentialsMap =
-        entityManager
-            .getCredentialCache()
-            .getOrGenerateSubScopeCreds(
-                getCredentialVendor(),
-                metaStoreSession,
-                entity,
-                allowList,
-                tableLocations,
-                writeLocations);
-    LOGGER
-        .atDebug()
-        .addKeyValue("tableIdentifier", tableIdentifier)
-        .addKeyValue("credentialKeys", credentialsMap.keySet())
-        .log("Loaded scoped credentials for table");
-    if (credentialsMap.isEmpty()) {
-      LOGGER.debug("No credentials found for table");
-    }
-    return credentialsMap;
+    return FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
   }
 
   /**
@@ -1243,7 +1195,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
               // then we should use the actual current table properties for IO 
refresh here
               // instead of the general tableDefaultProperties.
               FileIO fileIO =
-                  refreshIOWithCredentials(
+                  loadFileIOForTableLike(
                       tableIdentifier,
                       Set.of(latestLocationDir),
                       resolvedEntities,
@@ -1279,7 +1231,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
 
       // refresh credentials because we need to read the metadata file to 
validate its location
       tableFileIO =
-          refreshIOWithCredentials(
+          loadFileIOForTableLike(
               tableIdentifier,
               getLocationsAllowedToBeAccessed(metadata),
               resolvedStorageEntity,
@@ -1418,18 +1370,6 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
     }
   }
 
-  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
-      PolarisResolvedPathWrapper resolvedStorageEntity) {
-    Optional<PolarisEntity> storageInfoEntity =
-        resolvedStorageEntity.getRawFullPath().reversed().stream()
-            .filter(
-                e ->
-                    e.getInternalPropertiesAsMap()
-                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
-            .findFirst();
-    return storageInfoEntity;
-  }
-
   private class BasePolarisViewOperations extends BaseViewOperations {
     private final TableIdentifier identifier;
     private final String fullViewName;
@@ -1475,7 +1415,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
               // then we should use the actual current table properties for IO 
refresh here
               // instead of the general tableDefaultProperties.
               FileIO fileIO =
-                  refreshIOWithCredentials(
+                  loadFileIOForTableLike(
                       identifier,
                       Set.of(latestLocationDir),
                       resolvedEntities,
@@ -1529,7 +1469,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
       Map<String, String> tableProperties = new 
HashMap<>(metadata.properties());
 
       viewFileIO =
-          refreshIOWithCredentials(
+          loadFileIOForTableLike(
               identifier,
               getLocationsAllowedToBeAccessed(metadata),
               resolvedStorageEntity,
@@ -1586,27 +1526,22 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
     }
   }
 
-  private FileIO refreshIOWithCredentials(
+  private FileIO loadFileIOForTableLike(
       TableIdentifier identifier,
       Set<String> readLocations,
       PolarisResolvedPathWrapper resolvedStorageEntity,
       Map<String, String> tableProperties,
       Set<PolarisStorageActions> storageActions) {
-    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
-    Map<String, String> credentialsMap =
-        storageInfoEntity
-            .map(
-                storageInfo ->
-                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
-            .orElse(Map.of());
-
-    // Update the FileIO before we write the new metadata file
-    // update with table properties in case there are table-level overrides
-    // the credentials should always override table-level properties, since
-    // storage configuration will be found at whatever entity defines it
-    tableProperties.putAll(credentialsMap);
-    FileIO fileIO = null;
-    fileIO = loadFileIO(ioImplClassName, tableProperties);
+    // Reload fileIO based on table specific context
+    FileIO fileIO =
+        fileIOFactory.loadFileIO(
+            realmId,
+            ioImplClassName,
+            tableProperties,
+            identifier,
+            readLocations,
+            storageActions,
+            resolvedStorageEntity);
     // ensure the new fileIO is closed when the catalog is closed
     closeableGroup.addCloseable(fileIO);
     return fileIO;
@@ -1888,7 +1823,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
                     .toArray(String[]::new));
         resolvedStorageEntity = resolvedEntityView.getResolvedPath(nsLevel);
         if (resolvedStorageEntity != null) {
-          storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+          storageInfoEntity = 
FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
           break;
         }
       }
@@ -1905,7 +1840,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
 
       // Validate that we can construct a FileIO
       String locationDir = metadataLocation.substring(0, 
metadataLocation.lastIndexOf("/"));
-      refreshIOWithCredentials(
+      loadFileIOForTableLike(
           tableIdentifier,
           Set.of(locationDir),
           resolvedStorageEntity,
@@ -1960,7 +1895,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
       String locationDir = newLocation.substring(0, 
newLocation.lastIndexOf("/"));
 
       FileIO fileIO =
-          refreshIOWithCredentials(
+          loadFileIOForTableLike(
               tableIdentifier,
               Set.of(locationDir),
               resolvedParent,
@@ -2039,8 +1974,16 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
    * @return FileIO object
    */
   private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
-    Map<String, String> propertiesWithS3CustomizedClientFactory = new 
HashMap<>(properties);
-    return fileIOFactory.loadFileIO(ioImpl, 
propertiesWithS3CustomizedClientFactory);
+    TableLikeEntity tableLikeEntity = TableLikeEntity.of(catalogEntity);
+    TableIdentifier identifier = tableLikeEntity.getTableIdentifier();
+    Set<String> locations = Set.of(catalogEntity.getDefaultBaseLocation());
+    ResolvedPolarisEntity resolvedCatalogEntity =
+        new ResolvedPolarisEntity(catalogEntity, List.of(), List.of());
+    PolarisResolvedPathWrapper resolvedPath =
+        new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity));
+    Set<PolarisStorageActions> storageActions = 
Set.of(PolarisStorageActions.ALL);
+    return fileIOFactory.loadFileIO(
+        realmId, ioImpl, properties, identifier, locations, storageActions, 
resolvedPath);
   }
 
   private void blockedUserSpecifiedWriteLocation(Map<String, String> 
properties) {
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
index b962ec79..a92d4c35 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
@@ -18,19 +18,104 @@
  */
 package org.apache.polaris.service.catalog.io;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
-/** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
+/**
+ * A default FileIO factory implementation for creating Iceberg {@link FileIO} 
instances with
+ * contextual table-level properties.
+ *
+ * <p>This class acts as a translation layer between Polaris properties and 
the properties required
+ * by Iceberg's {@link FileIO}. For example, it evaluates storage actions and 
retrieves subscoped
+ * credentials to initialize a {@link FileIO} instance with the most limited 
permissions necessary.
+ */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+
+  private final RealmEntityManagerFactory realmEntityManagerFactory;
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmEntityManagerFactory realmEntityManagerFactory,
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore configurationStore) {
+    this.realmEntityManagerFactory = realmEntityManagerFactory;
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.configurationStore = configurationStore;
+  }
+
   @Override
-  public FileIO loadFileIO(String impl, Map<String, String> properties) {
-    return CatalogUtil.loadFileIO(impl, properties, new Configuration());
+  public FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties,
+      @Nonnull TableIdentifier identifier,
+      @Nonnull Set<String> tableLocations,
+      @Nonnull Set<PolarisStorageActions> storageActions,
+      @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
+    PolarisEntityManager entityManager =
+        realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+    PolarisCredentialVendor credentialVendor =
+        metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+    PolarisMetaStoreSession metaStoreSession =
+        metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+    // Get subcoped creds
+    properties = new HashMap<>(properties);
+    Optional<PolarisEntity> storageInfoEntity =
+        FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    FileIOUtil.refreshCredentials(
+                        realmId,
+                        entityManager,
+                        credentialVendor,
+                        metaStoreSession,
+                        configurationStore,
+                        identifier,
+                        tableLocations,
+                        storageActions,
+                        storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO with the subscoped credentials
+    // Update with properties in case there are table-level overrides the 
credentials should
+    // always override table-level properties, since storage configuration 
will be found at
+    // whatever entity defines it
+    properties.putAll(credentialsMap);
+
+    return loadFileIOInternal(ioImplClassName, properties);
+  }
+
+  @VisibleForTesting
+  FileIO loadFileIOInternal(
+      @Nonnull String ioImplClassName, @Nonnull Map<String, String> 
properties) {
+    return CatalogUtil.loadFileIO(ioImplClassName, properties, new 
Configuration());
   }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
index ca3c0851..905441d7 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
@@ -18,10 +18,44 @@
  */
 package org.apache.polaris.service.catalog.io;
 
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
 import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 
-/** Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3. */
+/**
+ * Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3.
+ *
+ * <p>Implementations are available via CDI as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ */
 public interface FileIOFactory {
-  FileIO loadFileIO(String impl, Map<String, String> properties);
+
+  /**
+   * Loads a FileIO implementation for a specific table in the given realm 
with detailed config.
+   *
+   * <p>This method may obtain subscoped credentials to restrict the FileIO's 
permissions, ensuring
+   * secure and limited access to the table's data and locations.
+   *
+   * @param realmId the realm for which the FileIO is being loaded.
+   * @param ioImplClassName the class name of the FileIO implementation to 
load.
+   * @param properties configuration properties for the FileIO.
+   * @param identifier the table identifier.
+   * @param tableLocations locations associated with the table.
+   * @param storageActions storage actions allowed for the table.
+   * @param resolvedEntityPath resolved paths for the entities.
+   * @return a configured FileIO instance.
+   */
+  FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties,
+      @Nonnull TableIdentifier identifier,
+      @Nonnull Set<String> tableLocations,
+      @Nonnull Set<PolarisStorageActions> storageActions,
+      @Nonnull PolarisResolvedPathWrapper resolvedEntityPath);
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
new file mode 100644
index 00000000..9775ebd8
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileIOUtil {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileIOUtil.class);
+
+  private FileIOUtil() {}
+
+  /**
+   * Finds storage configuration information in the hierarchy of the resolved 
storage entity.
+   *
+   * <p>This method starts at the "leaf" level (e.g., table) and walks 
"upwards" through namespaces
+   * in the hierarchy to the "root." It searches for the first entity 
containing storage config
+   * properties, identified using a key from {@link
+   * PolarisEntityConstants#getStorageConfigInfoPropertyName()}.
+   *
+   * @param resolvedStorageEntity the resolved entity wrapper containing the 
hierarchical path
+   * @return an {@link Optional} containing the entity with storage config, or 
empty if not found
+   */
+  public static Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  /**
+   * Refreshes or generates subscoped creds for accessing table storage based 
on the params.
+   *
+   * <p>Use cases:
+   *
+   * <ul>
+   *   <li>In {@link BasePolarisCatalog}, subscoped credentials are generated 
or refreshed when the
+   *       client sends a loadTable request to vend credentials.
+   *   <li>In {@link DefaultFileIOFactory}, subscoped credentials are obtained 
to access the storage
+   *       and read/write metadata JSON files.
+   * </ul>
+   */
+  public static Map<String, String> refreshCredentials(
+      RealmId realmId,
+      PolarisEntityManager entityManager,
+      PolarisCredentialVendor credentialVendor,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore configurationStore,
+      TableIdentifier tableIdentifier,
+      Set<String> tableLocations,
+      Set<PolarisStorageActions> storageActions,
+      PolarisEntity entity) {
+    boolean skipCredentialSubscopingIndirection =
+        configurationStore.getConfiguration(
+            realmId,
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (skipCredentialSubscopingIndirection) {
+      LOGGER
+          .atDebug()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =
+        entityManager
+            .getCredentialCache()
+            .getOrGenerateSubScopeCreds(
+                credentialVendor,
+                metaStoreSession,
+                entity,
+                allowList,
+                tableLocations,
+                writeLocations);
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", credentialsMap.keySet())
+        .log("Loaded scoped credentials for table");
+    if (credentialsMap.isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return credentialsMap;
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
index 469587a3..fe2cd0a7 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java
@@ -19,19 +19,54 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
 /** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
 @ApplicationScoped
 @Identifier("wasb")
 public class WasbTranslatingFileIOFactory implements FileIOFactory {
+
+  private final FileIOFactory defaultFileIOFactory;
+
+  @Inject
+  public WasbTranslatingFileIOFactory(
+      RealmEntityManagerFactory realmEntityManagerFactory,
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore configurationStore) {
+    defaultFileIOFactory =
+        new DefaultFileIOFactory(
+            realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore);
+  }
+
   @Override
-  public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
+  public FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties,
+      @Nonnull TableIdentifier identifier,
+      @Nonnull Set<String> tableLocations,
+      @Nonnull Set<PolarisStorageActions> storageActions,
+      @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
     return new WasbTranslatingFileIO(
-        CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()));
+        defaultFileIOFactory.loadFileIO(
+            realmId,
+            ioImplClassName,
+            properties,
+            identifier,
+            tableLocations,
+            storageActions,
+            resolvedEntityPath));
   }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
 
b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
index 71d2660e..7fa5802e 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
@@ -21,68 +21,51 @@ package org.apache.polaris.service.task;
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
-import org.apache.polaris.core.PolarisConfiguration;
-import org.apache.polaris.core.PolarisConfigurationStore;
 import org.apache.polaris.core.context.RealmId;
 import org.apache.polaris.core.entity.PolarisTaskConstants;
+import org.apache.polaris.core.entity.TableLikeEntity;
 import org.apache.polaris.core.entity.TaskEntity;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 
 @ApplicationScoped
 public class TaskFileIOSupplier implements BiFunction<TaskEntity, RealmId, 
FileIO> {
-  private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final FileIOFactory fileIOFactory;
-  private final PolarisConfigurationStore configurationStore;
 
   @Inject
-  public TaskFileIOSupplier(
-      MetaStoreManagerFactory metaStoreManagerFactory,
-      FileIOFactory fileIOFactory,
-      PolarisConfigurationStore configurationStore) {
-    this.metaStoreManagerFactory = metaStoreManagerFactory;
+  public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
     this.fileIOFactory = fileIOFactory;
-    this.configurationStore = configurationStore;
   }
 
   @Override
   public FileIO apply(TaskEntity task, RealmId realmId) {
     Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
-    String location = 
internalProperties.get(PolarisTaskConstants.STORAGE_LOCATION);
-    PolarisMetaStoreManager metaStoreManager =
-        metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
-    PolarisMetaStoreSession metaStoreSession =
-        metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
     Map<String, String> properties = new HashMap<>(internalProperties);
 
-    Boolean skipCredentialSubscopingIndirection =
-        configurationStore.getConfiguration(
-            realmId,
-            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
-            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    TableLikeEntity tableEntity = TableLikeEntity.of(task);
+    TableIdentifier identifier = tableEntity.getTableIdentifier();
+    String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
+    Set<String> locations = Set.of(location);
+    Set<PolarisStorageActions> storageActions = 
Set.of(PolarisStorageActions.ALL);
+    ResolvedPolarisEntity resolvedTaskEntity =
+        new ResolvedPolarisEntity(task, List.of(), List.of());
+    PolarisResolvedPathWrapper resolvedPath =
+        new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity));
 
-    if (!skipCredentialSubscopingIndirection) {
-      properties.putAll(
-          metaStoreManagerFactory
-              .getOrCreateStorageCredentialCache(realmId)
-              .getOrGenerateSubScopeCreds(
-                  metaStoreManager,
-                  metaStoreSession,
-                  task,
-                  true,
-                  Set.of(location),
-                  Set.of(location)));
-    }
     String ioImpl =
         properties.getOrDefault(
             CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.io.ResolvingFileIO");
-    return fileIOFactory.loadFileIO(ioImpl, properties);
+
+    return fileIOFactory.loadFileIO(
+        realmId, ioImpl, properties, identifier, locations, storageActions, 
resolvedPath);
   }
 }
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
new file mode 100644
index 00000000..6455c168
--- /dev/null
+++ 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import jakarta.annotation.Nonnull;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.*;
+import org.apache.polaris.core.persistence.*;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
+import org.apache.polaris.service.task.TaskFileIOSupplier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class FileIOFactoryTest {
+
+  public static final String CATALOG_NAME = "polaris-catalog";
+  public static final Namespace NS = Namespace.of("newdb");
+  public static final TableIdentifier TABLE = TableIdentifier.of(NS, "table");
+  public static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
+          required(4, "data", Types.StringType.get()));
+  public static final String TEST_ACCESS_KEY = "test_access_key";
+  public static final String SECRET_ACCESS_KEY = "secret_access_key";
+  public static final String SESSION_TOKEN = "session_token";
+
+  private RealmId realmId;
+  private StsClient stsClient;
+  private TestServices testServices;
+
+  @BeforeEach
+  public void before(TestInfo testInfo) {
+    String realmName =
+        "realm_%s_%s"
+            .formatted(
+                testInfo.getTestMethod().map(Method::getName).orElse("test"), 
System.nanoTime());
+    realmId = RealmId.newRealmId(realmName);
+
+    // Mock get subscoped creds
+    stsClient = Mockito.mock(StsClient.class);
+    when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
+        .thenReturn(
+            AssumeRoleResponse.builder()
+                .credentials(
+                    Credentials.builder()
+                        .accessKeyId(TEST_ACCESS_KEY)
+                        .secretAccessKey(SECRET_ACCESS_KEY)
+                        .sessionToken(SESSION_TOKEN)
+                        .build())
+                .build());
+
+    // Spy FileIOFactory and check if the credentials are passed to the FileIO
+    TestServices.FileIOFactorySupplier fileIOFactorySupplier =
+        (entityManagerFactory, metaStoreManagerFactory, configurationStore) ->
+            Mockito.spy(
+                new DefaultFileIOFactory(
+                    entityManagerFactory, metaStoreManagerFactory, 
configurationStore) {
+                  @Override
+                  FileIO loadFileIOInternal(
+                      @Nonnull String ioImplClassName, @Nonnull Map<String, 
String> properties) {
+                    // properties should contain credentials
+                    Assertions.assertThat(properties)
+                        .containsEntry(S3FileIOProperties.ACCESS_KEY_ID, 
TEST_ACCESS_KEY)
+                        .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY, 
SECRET_ACCESS_KEY)
+                        .containsEntry(S3FileIOProperties.SESSION_TOKEN, 
SESSION_TOKEN);
+                    return super.loadFileIOInternal(ioImplClassName, 
properties);
+                  }
+                });
+
+    testServices =
+        TestServices.builder()
+            .config(Map.of("ALLOW_SPECIFYING_FILE_IO_IMPL", true))
+            .realmId(realmId)
+            .stsClient(stsClient)
+            .fileIOFactorySupplier(fileIOFactorySupplier)
+            .build();
+  }
+
+  @AfterEach
+  public void after() {}
+
+  @Test
+  public void testLoadFileIOForTableLike() {
+    BasePolarisCatalog catalog = createCatalog(testServices);
+    catalog.createNamespace(NS);
+    catalog.createTable(TABLE, SCHEMA);
+
+    // 1. BasePolarisCatalog:doCommit: for writing the table during the 
creation
+    Mockito.verify(testServices.fileIOFactory(), Mockito.times(1))
+        .loadFileIO(
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any());
+  }
+
+  @Test
+  public void testLoadFileIOForCleanupTask() {
+    BasePolarisCatalog catalog = createCatalog(testServices);
+    catalog.createNamespace(NS);
+    catalog.createTable(TABLE, SCHEMA);
+    catalog.dropTable(TABLE, true);
+
+    List<PolarisBaseEntity> tasks =
+        testServices
+            .metaStoreManagerFactory()
+            .getOrCreateMetaStoreManager(realmId)
+            .loadTasks(
+                
testServices.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+                "testExecutor",
+                1)
+            .getEntities();
+    Assertions.assertThat(tasks).hasSize(1);
+    TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
+    FileIO fileIO = new 
TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmId);
+    
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
+
+    // 1. BasePolarisCatalog:doCommit: for writing the table during the 
creation
+    // 2. BasePolarisCatalog:doRefresh: for reading the table during the drop
+    // 3. TaskFileIOSupplier:apply: for clean up metadata files and merge files
+    Mockito.verify(testServices.fileIOFactory(), Mockito.times(3))
+        .loadFileIO(
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any());
+  }
+
+  BasePolarisCatalog createCatalog(TestServices services) {
+    String storageLocation = "s3://my-bucket/path/to/data";
+    AwsStorageConfigInfo awsStorageConfigInfo =
+        AwsStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setAllowedLocations(List.of(storageLocation))
+            .setRoleArn("arn:aws:iam::012345678901:role/jdoe")
+            .build();
+
+    // Create Catalog Entity
+    Catalog catalog =
+        PolarisCatalog.builder()
+            .setType(Catalog.TypeEnum.INTERNAL)
+            .setName(CATALOG_NAME)
+            .setProperties(new CatalogProperties("s3://tmp/path/to/data"))
+            .setStorageConfigInfo(awsStorageConfigInfo)
+            .build();
+    services
+        .catalogsApi()
+        .createCatalog(
+            new CreateCatalogRequest(catalog), services.realmId(), 
services.securityContext());
+
+    PolarisPassthroughResolutionView passthroughView =
+        new PolarisPassthroughResolutionView(
+            services.entityManagerFactory().getOrCreateEntityManager(realmId),
+            
services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+            services.securityContext(),
+            CATALOG_NAME);
+    BasePolarisCatalog polarisCatalog =
+        new BasePolarisCatalog(
+            services.realmId(),
+            services.entityManagerFactory().getOrCreateEntityManager(realmId),
+            
services.metaStoreManagerFactory().getOrCreateMetaStoreManager(realmId),
+            
services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(),
+            services.configurationStore(),
+            services.polarisDiagnostics(),
+            passthroughView,
+            services.securityContext(),
+            services.taskExecutor(),
+            services.fileIOFactory());
+    polarisCatalog.initialize(
+        CATALOG_NAME,
+        ImmutableMap.of(
+            org.apache.iceberg.CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.inmemory.InMemoryFileIO"));
+    return polarisCatalog;
+  }
+}
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
new file mode 100644
index 00000000..c09efede
--- /dev/null
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.GoogleCredentials;
+import jakarta.ws.rs.core.SecurityContext;
+import java.security.Principal;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.service.admin.PolarisServiceImpl;
+import org.apache.polaris.service.admin.api.PolarisCatalogsApi;
+import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
+import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
+import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
+import org.apache.polaris.service.config.DefaultConfigurationStore;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
+import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.apache.polaris.service.task.TaskExecutor;
+import org.assertj.core.util.TriFunction;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+
+public record TestServices(
+    PolarisCatalogsApi catalogsApi,
+    IcebergRestCatalogApi restApi,
+    PolarisConfigurationStore configurationStore,
+    PolarisDiagnostics polarisDiagnostics,
+    RealmEntityManagerFactory entityManagerFactory,
+    MetaStoreManagerFactory metaStoreManagerFactory,
+    RealmId realmId,
+    SecurityContext securityContext,
+    FileIOFactory fileIOFactory,
+    TaskExecutor taskExecutor) {
+
+  private static final RealmId TEST_REALM =
+      org.apache.polaris.core.context.RealmId.newRealmId("test-realm");
+  private static final String GCP_ACCESS_TOKEN = "abc";
+
+  @FunctionalInterface
+  public interface FileIOFactorySupplier
+      extends TriFunction<
+          RealmEntityManagerFactory,
+          MetaStoreManagerFactory,
+          PolarisConfigurationStore,
+          FileIOFactory> {}
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private RealmId realmId = TEST_REALM;
+    private Map<String, Object> config = Map.of();
+    private StsClient stsClient = Mockito.mock(StsClient.class);
+    private FileIOFactorySupplier fileIOFactorySupplier = 
MeasuredFileIOFactory::new;
+
+    private Builder() {}
+
+    public Builder realmId(RealmId realmId) {
+      this.realmId = realmId;
+      return this;
+    }
+
+    public Builder config(Map<String, Object> config) {
+      this.config = config;
+      return this;
+    }
+
+    public Builder fileIOFactorySupplier(FileIOFactorySupplier 
fileIOFactorySupplier) {
+      this.fileIOFactorySupplier = fileIOFactorySupplier;
+      return this;
+    }
+
+    public Builder stsClient(StsClient stsClient) {
+      this.stsClient = stsClient;
+      return this;
+    }
+
+    public TestServices build() {
+      DefaultConfigurationStore configurationStore = new 
DefaultConfigurationStore(config);
+      PolarisDiagnostics polarisDiagnostics = 
Mockito.mock(PolarisDiagnostics.class);
+      PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class);
+
+      // Application level
+      PolarisStorageIntegrationProviderImpl storageIntegrationProvider =
+          new PolarisStorageIntegrationProviderImpl(
+              () -> stsClient,
+              () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN, 
new Date())),
+              configurationStore);
+      InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory =
+          new InMemoryPolarisMetaStoreManagerFactory(
+              storageIntegrationProvider,
+              configurationStore,
+              polarisDiagnostics,
+              Clock.systemDefaultZone());
+      RealmEntityManagerFactory realmEntityManagerFactory =
+          new RealmEntityManagerFactory(metaStoreManagerFactory, 
polarisDiagnostics) {};
+
+      PolarisEntityManager entityManager =
+          realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+      PolarisMetaStoreManager metaStoreManager =
+          metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+      PolarisMetaStoreSession metaStoreSession =
+          metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+      FileIOFactory fileIOFactory =
+          fileIOFactorySupplier.apply(
+              realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore);
+
+      TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
+      IcebergRestCatalogApiService service =
+          new IcebergCatalogAdapter(
+              realmId,
+              entityManager,
+              metaStoreManager,
+              metaStoreSession,
+              configurationStore,
+              polarisDiagnostics,
+              authorizer,
+              taskExecutor,
+              fileIOFactory);
+
+      IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
+
+      PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal =
+          metaStoreManager.createPrincipal(
+              metaStoreSession,
+              new PrincipalEntity.Builder()
+                  .setName("test-principal")
+                  .setCreateTimestamp(Instant.now().toEpochMilli())
+                  .setCredentialRotationRequiredState()
+                  .build());
+      AuthenticatedPolarisPrincipal principal =
+          new AuthenticatedPolarisPrincipal(
+              PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of());
+
+      SecurityContext securityContext =
+          new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+              return principal;
+            }
+
+            @Override
+            public boolean isUserInRole(String s) {
+              return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+              return true;
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+              return "";
+            }
+          };
+
+      PolarisCatalogsApi catalogsApi =
+          new PolarisCatalogsApi(
+              new PolarisServiceImpl(
+                  entityManager,
+                  metaStoreManager,
+                  metaStoreSession,
+                  configurationStore,
+                  authorizer,
+                  polarisDiagnostics));
+
+      return new TestServices(
+          catalogsApi,
+          restApi,
+          configurationStore,
+          polarisDiagnostics,
+          realmEntityManagerFactory,
+          metaStoreManagerFactory,
+          realmId,
+          securityContext,
+          fileIOFactory,
+          taskExecutor);
+    }
+  }
+}
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
similarity index 99%
rename from 
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
rename to 
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
index db74ad3b..54197d7e 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.quarkus.catalog;
+package org.apache.polaris.service.catalog;
 
 import jakarta.ws.rs.core.SecurityContext;
 import java.util.Arrays;
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
similarity index 93%
rename from 
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
rename to 
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
index 8acf2efd..0313f86f 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
 
 import java.util.Map;
 import java.util.Optional;
@@ -32,7 +32,7 @@ import org.apache.iceberg.io.OutputFile;
  * File IO wrapper used for tests. It measures the number of bytes read, files 
written, and files
  * deleted. It can inject exceptions during InputFile and OutputFile creation.
  */
-public class TestFileIO implements FileIO {
+public class MeasuredFileIO implements FileIO {
   private final FileIO io;
 
   // When present, the following will be used to throw exceptions at various 
parts of the IO
@@ -44,7 +44,7 @@ public class TestFileIO implements FileIO {
   private int numOutputFiles;
   private int numDeletedFiles;
 
-  public TestFileIO(
+  public MeasuredFileIO(
       FileIO io,
       Optional<Supplier<RuntimeException>> newInputFileExceptionSupplier,
       Optional<Supplier<RuntimeException>> newOutputFileExceptionSupplier,
@@ -73,9 +73,9 @@ public class TestFileIO implements FileIO {
           throw s.get();
         });
 
-    // Use the inner's length in case the TestInputFile throws a getLength 
exception
+    // Use the inner's length in case the MeasuredInputFile throws a getLength 
exception
     inputBytes += inner.getLength();
-    return new TestInputFile(inner, getLengthExceptionSupplier);
+    return new MeasuredInputFile(inner, getLengthExceptionSupplier);
   }
 
   @Override
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
similarity index 55%
rename from 
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
rename to 
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
index 75047559..5826c1da 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java
@@ -16,26 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
 
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.inject.Vetoed;
+import jakarta.inject.Inject;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Supplier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
-import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
 /**
  * A FileIOFactory that measures the number of bytes read, files written, and 
files deleted. It can
  * inject exceptions at various parts of the IO construction.
  */
 @Vetoed
-public class TestFileIOFactory extends DefaultFileIOFactory {
-  private final List<TestFileIO> ios = new ArrayList<>();
+public class MeasuredFileIOFactory implements FileIOFactory {
+  private final List<MeasuredFileIO> ios = new ArrayList<>();
 
   // When present, the following will be used to throw exceptions at various 
parts of the IO
   public Optional<Supplier<RuntimeException>> loadFileIOExceptionSupplier = 
Optional.empty();
@@ -43,18 +50,42 @@ public class TestFileIOFactory extends DefaultFileIOFactory 
{
   public Optional<Supplier<RuntimeException>> newOutputFileExceptionSupplier = 
Optional.empty();
   public Optional<Supplier<RuntimeException>> getLengthExceptionSupplier = 
Optional.empty();
 
-  public TestFileIOFactory() {}
+  private final FileIOFactory defaultFileIOFactory;
+
+  @Inject
+  public MeasuredFileIOFactory(
+      RealmEntityManagerFactory realmEntityManagerFactory,
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore configurationStore) {
+    defaultFileIOFactory =
+        new DefaultFileIOFactory(
+            realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore);
+  }
 
   @Override
-  public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
+  public FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties,
+      @Nonnull TableIdentifier identifier,
+      @Nonnull Set<String> tableLocations,
+      @Nonnull Set<PolarisStorageActions> storageActions,
+      @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
     loadFileIOExceptionSupplier.ifPresent(
         s -> {
           throw s.get();
         });
 
-    TestFileIO wrapped =
-        new TestFileIO(
-            CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()),
+    MeasuredFileIO wrapped =
+        new MeasuredFileIO(
+            defaultFileIOFactory.loadFileIO(
+                realmId,
+                ioImplClassName,
+                properties,
+                identifier,
+                tableLocations,
+                storageActions,
+                resolvedEntityPath),
             newInputFileExceptionSupplier,
             newOutputFileExceptionSupplier,
             getLengthExceptionSupplier);
@@ -64,7 +95,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
 
   public long getInputBytes() {
     long sum = 0;
-    for (TestFileIO io : ios) {
+    for (MeasuredFileIO io : ios) {
       sum += io.getInputBytes();
     }
     return sum;
@@ -72,7 +103,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
 
   public long getNumOutputFiles() {
     long sum = 0;
-    for (TestFileIO io : ios) {
+    for (MeasuredFileIO io : ios) {
       sum += io.getNumOuptutFiles();
     }
     return sum;
@@ -80,7 +111,7 @@ public class TestFileIOFactory extends DefaultFileIOFactory {
 
   public long getNumDeletedFiles() {
     long sum = 0;
-    for (TestFileIO io : ios) {
+    for (MeasuredFileIO io : ios) {
       sum += io.getNumDeletedFiles();
     }
     return sum;
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
similarity index 93%
rename from 
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
rename to 
service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
index da050c11..a4b2f236 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.quarkus.catalog.io;
+package org.apache.polaris.service.catalog.io;
 
 import java.util.Optional;
 import java.util.function.Supplier;
@@ -24,11 +24,11 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.SeekableInputStream;
 
 /** An InputFile wrapper that can be forced to throw exceptions. */
-public class TestInputFile implements InputFile {
+public class MeasuredInputFile implements InputFile {
   private final InputFile inputFile;
   private final Optional<Supplier<RuntimeException>> 
getLengthExceptionSupplier;
 
-  public TestInputFile(
+  public MeasuredInputFile(
       InputFile inputFile, Optional<Supplier<RuntimeException>> 
getLengthExceptionSupplier) {
     this.inputFile = inputFile;
     this.getLengthExceptionSupplier = getLengthExceptionSupplier;


Reply via email to