This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 308b103d5 Make StorageAccessConfigProvider request-scoped (#2974)
308b103d5 is described below

commit 308b103d5f25519851f3e5237538ccfa50481570
Author: Christopher Lambert <[email protected]>
AuthorDate: Tue Nov 11 17:48:27 2025 +0100

    Make StorageAccessConfigProvider request-scoped (#2974)
    
    - add `StorageCredentialsVendor` as request-scoped wrapper around 
`PolarisCredentialVendor`
    - make `FileIOFactory` request-scoped
    - make `TaskFileIOSupplier` request-scoped
---
 .../AtomicOperationMetaStoreManager.java           |   2 +-
 .../TransactionWorkspaceMetaStoreManager.java      |   4 +-
 .../TransactionalMetaStoreManagerImpl.java         |   2 +-
 .../core/storage/PolarisCredentialVendor.java      |   2 +-
 ...alVendor.java => StorageCredentialsVendor.java} |  54 ++++++---
 .../core/storage/cache/StorageCredentialCache.java |  32 +++--
 .../storage/cache/StorageCredentialCacheTest.java  | 130 +++++++--------------
 .../service/catalog/iceberg/IcebergCatalog.java    |  14 +--
 .../catalog/iceberg/IcebergCatalogHandler.java     |   1 -
 .../service/catalog/io/DefaultFileIOFactory.java   |   4 +-
 .../polaris/service/catalog/io/FileIOFactory.java  |   4 +-
 .../polaris/service/catalog/io/FileIOUtil.java     |  73 ------------
 .../catalog/io/StorageAccessConfigProvider.java    |  67 ++++++++---
 .../polaris/service/config/ServiceProducers.java   |   9 ++
 .../service/task/BatchFileCleanupTaskHandler.java  |   2 +-
 .../task/ManifestFileCleanupTaskHandler.java       |   2 +-
 .../service/task/TableCleanupTaskHandler.java      |   3 +-
 .../polaris/service/task/TaskFileIOSupplier.java   |  10 +-
 .../AbstractPolarisGenericTableCatalogTest.java    |   5 +-
 .../iceberg/AbstractIcebergCatalogTest.java        |   8 +-
 .../iceberg/AbstractIcebergCatalogViewTest.java    |   6 +-
 .../service/catalog/io/FileIOFactoryTest.java      |   2 +-
 .../catalog/policy/AbstractPolicyCatalogTest.java  |   5 +-
 .../org/apache/polaris/service/TestServices.java   |   5 +-
 24 files changed, 196 insertions(+), 250 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index 3ae2ac2c9..431520e9f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -1596,7 +1596,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long entityId,
-      PolarisEntityType entityType,
+      @Nonnull PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 99c1f8162..b0c78c0b1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -339,11 +339,11 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
   }
 
   @Override
-  public ScopedCredentialsResult getSubscopedCredsForEntity(
+  public @Nonnull ScopedCredentialsResult getSubscopedCredsForEntity(
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long entityId,
-      PolarisEntityType entityType,
+      @Nonnull PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 815e119d3..6602cf01a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -2094,7 +2094,7 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long entityId,
-      PolarisEntityType entityType,
+      @Nonnull PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
index d64e9ad88..ee90294c6 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
@@ -49,7 +49,7 @@ public interface PolarisCredentialVendor {
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long entityId,
-      PolarisEntityType entityType,
+      @Nonnull PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
similarity index 57%
copy from 
polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
copy to 
polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
index d64e9ad88..59bcf86c8 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
@@ -16,24 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.polaris.core.storage;
 
 import jakarta.annotation.Nonnull;
 import java.util.Optional;
 import java.util.Set;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
 
-/** Manage credentials for storage locations. */
-public interface PolarisCredentialVendor {
+public class StorageCredentialsVendor {
+
+  private final PolarisCredentialVendor polarisCredentialVendor;
+  private final CallContext callContext;
+
+  public StorageCredentialsVendor(
+      PolarisCredentialVendor polarisCredentialVendor, CallContext 
callContext) {
+    this.polarisCredentialVendor = polarisCredentialVendor;
+    this.callContext = callContext;
+  }
+
+  public RealmContext getRealmContext() {
+    return callContext.getRealmContext();
+  }
+
+  public RealmConfig getRealmConfig() {
+    return callContext.getRealmConfig();
+  }
+
   /**
-   * Get a sub-scoped credentials for an entity against the provided allowed 
read and write
-   * locations.
+   * Get sub-scoped credentials for an entity against the provided allowed 
read and write locations.
    *
-   * @param callCtx the polaris call context
-   * @param catalogId the catalog id
-   * @param entityId the entity id
+   * @param entity the entity
    * @param allowListOperation whether to allow LIST operation on the 
allowedReadLocations and
    *     allowedWriteLocations
    * @param allowedReadLocations a set of allowed to read locations
@@ -45,13 +62,20 @@ public interface PolarisCredentialVendor {
    * @return an enum map containing the scoped credentials
    */
   @Nonnull
-  ScopedCredentialsResult getSubscopedCredsForEntity(
-      @Nonnull PolarisCallContext callCtx,
-      long catalogId,
-      long entityId,
-      PolarisEntityType entityType,
+  public ScopedCredentialsResult getSubscopedCredsForEntity(
+      @Nonnull PolarisEntity entity,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
-      Optional<String> refreshCredentialsEndpoint);
+      Optional<String> refreshCredentialsEndpoint) {
+    return polarisCredentialVendor.getSubscopedCredsForEntity(
+        callContext.getPolarisCallContext(),
+        entity.getCatalogId(),
+        entity.getId(),
+        entity.getType(),
+        allowListOperation,
+        allowedReadLocations,
+        allowedWriteLocations,
+        refreshCredentialsEndpoint);
+  }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
index 93f5e351a..0f22863e5 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
@@ -30,15 +30,15 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
-import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
-import org.apache.polaris.core.storage.PolarisCredentialVendor;
 import org.apache.polaris.core.storage.StorageAccessConfig;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,8 +95,8 @@ public class StorageCredentialCache {
   /**
    * Either get from the cache or generate a new entry for a scoped creds
    *
-   * @param credentialVendor the credential vendor used to generate a new 
scoped creds if needed
-   * @param callCtx the call context
+   * @param storageCredentialsVendor the credential vendor used to generate a 
new scoped creds if
+   *     needed
    * @param polarisEntity the polaris entity that is going to scoped creds
    * @param allowListOperation whether allow list action on the provided read 
and write locations
    * @param allowedReadLocations a set of allowed to read locations
@@ -104,20 +104,21 @@ public class StorageCredentialCache {
    * @return the a map of string containing the scoped creds information
    */
   public StorageAccessConfig getOrGenerateSubScopeCreds(
-      @Nonnull PolarisCredentialVendor credentialVendor,
-      @Nonnull PolarisCallContext callCtx,
+      @Nonnull StorageCredentialsVendor storageCredentialsVendor,
       @Nonnull PolarisEntity polarisEntity,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
       Optional<String> refreshCredentialsEndpoint) {
+    RealmContext realmContext = storageCredentialsVendor.getRealmContext();
+    RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig();
     if (!isTypeSupported(polarisEntity.getType())) {
       diagnostics.fail(
           "entity_type_not_suppported_to_scope_creds", "type={}", 
polarisEntity.getType());
     }
     StorageCredentialCacheKey key =
         StorageCredentialCacheKey.of(
-            callCtx.getRealmContext().getRealmIdentifier(),
+            realmContext.getRealmIdentifier(),
             polarisEntity,
             allowListOperation,
             allowedReadLocations,
@@ -128,17 +129,14 @@ public class StorageCredentialCache {
         k -> {
           LOGGER.atDebug().log("StorageCredentialCache::load");
           ScopedCredentialsResult scopedCredentialsResult =
-              credentialVendor.getSubscopedCredsForEntity(
-                  callCtx,
-                  k.catalogId(),
-                  polarisEntity.getId(),
-                  polarisEntity.getType(),
-                  k.allowedListAction(),
-                  k.allowedReadLocations(),
-                  k.allowedWriteLocations(),
-                  k.refreshCredentialsEndpoint());
+              storageCredentialsVendor.getSubscopedCredsForEntity(
+                  polarisEntity,
+                  allowListOperation,
+                  allowedReadLocations,
+                  allowedWriteLocations,
+                  refreshCredentialsEndpoint);
           if (scopedCredentialsResult.isSuccess()) {
-            long maxCacheDurationMs = 
maxCacheDurationMs(callCtx.getRealmConfig());
+            long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
             return new StorageCredentialCacheEntry(
                 scopedCredentialsResult.getStorageAccessConfig(), 
maxCacheDurationMs);
           }
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
index f1e5ac1f6..c4db87231 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.polaris.core.storage.cache;
 
-import static 
org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;
-
 import jakarta.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,63 +26,56 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
-import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.config.RealmConfigImpl;
+import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.dao.entity.BaseResult;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
-import 
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
-import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
-import 
org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl;
 import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 public class StorageCredentialCacheTest {
   private final PolarisDiagnostics diagServices = new 
PolarisDefaultDiagServiceImpl();
-  private final PolarisCallContext callCtx;
-  private final StorageCredentialCacheConfig storageCredentialCacheConfig;
-  private final PolarisMetaStoreManager metaStoreManager;
+  private final RealmContext realmContext = () -> "testRealm";
+  private final RealmConfig realmConfig =
+      new RealmConfigImpl(new PolarisConfigurationStore() {}, realmContext);
+  private final StorageCredentialsVendor storageCredentialsVendor;
 
   private StorageCredentialCache storageCredentialCache;
 
   public StorageCredentialCacheTest() {
-    // the entity store, use treemap implementation
-    TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
-    // to interact with the metastore
-    TransactionalPersistence metaStore =
-        new TreeMapTransactionalPersistenceImpl(
-            diagServices, store, Mockito.mock(), RANDOM_SECRETS);
-    callCtx = new PolarisCallContext(() -> "testRealm", metaStore);
-    storageCredentialCacheConfig = () -> 10_000;
-    metaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
-    storageCredentialCache = newStorageCredentialCache();
+    storageCredentialsVendor = Mockito.mock(StorageCredentialsVendor.class);
+    
Mockito.when(storageCredentialsVendor.getRealmContext()).thenReturn(realmContext);
+    
Mockito.when(storageCredentialsVendor.getRealmConfig()).thenReturn(realmConfig);
   }
 
-  private StorageCredentialCache newStorageCredentialCache() {
-    return new StorageCredentialCache(diagServices, 
storageCredentialCacheConfig);
+  @BeforeEach
+  void beforeEach() {
+    StorageCredentialCacheConfig storageCredentialCacheConfig = () -> 10_000;
+    storageCredentialCache = new StorageCredentialCache(diagServices, 
storageCredentialCacheConfig);
   }
 
   @Test
   public void testBadResult() {
-    storageCredentialCache = newStorageCredentialCache();
     ScopedCredentialsResult badResult =
         new ScopedCredentialsResult(
             BaseResult.ReturnStatus.SUBSCOPE_CREDS_ERROR, "extra_error_info");
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -98,8 +89,7 @@ public class StorageCredentialCacheTest {
     Assertions.assertThatThrownBy(
             () ->
                 storageCredentialCache.getOrGenerateSubScopeCreds(
-                    metaStoreManager,
-                    callCtx,
+                    storageCredentialsVendor,
                     polarisEntity,
                     true,
                     Set.of("s3://bucket1/path"),
@@ -111,14 +101,10 @@ public class StorageCredentialCacheTest {
 
   @Test
   public void testCacheHit() {
-    storageCredentialCache = newStorageCredentialCache();
     List<ScopedCredentialsResult> mockedScopedCreds =
         getFakeScopedCreds(3, /* expireSoon= */ false);
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -134,8 +120,7 @@ public class StorageCredentialCacheTest {
 
     // add an item to the cache
     storageCredentialCache.getOrGenerateSubScopeCreds(
-        metaStoreManager,
-        callCtx,
+        storageCredentialsVendor,
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -145,8 +130,7 @@ public class StorageCredentialCacheTest {
 
     // subscope for the same entity and same allowed locations, will hit the 
cache
     storageCredentialCache.getOrGenerateSubScopeCreds(
-        metaStoreManager,
-        callCtx,
+        storageCredentialsVendor,
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -156,15 +140,11 @@ public class StorageCredentialCacheTest {
   }
 
   @RepeatedTest(10)
-  public void testCacheEvict() throws InterruptedException {
-    storageCredentialCache = newStorageCredentialCache();
+  public void testCacheEvict() throws Exception {
     List<ScopedCredentialsResult> mockedScopedCreds = getFakeScopedCreds(3, /* 
expireSoon= */ true);
 
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -179,7 +159,7 @@ public class StorageCredentialCacheTest {
     PolarisEntity polarisEntity = new PolarisEntity(baseEntity);
     StorageCredentialCacheKey cacheKey =
         StorageCredentialCacheKey.of(
-            callCtx.getRealmContext().getRealmIdentifier(),
+            realmContext.getRealmIdentifier(),
             polarisEntity,
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -188,8 +168,7 @@ public class StorageCredentialCacheTest {
 
     // the entry will be evicted immediately because the token is expired
     storageCredentialCache.getOrGenerateSubScopeCreds(
-        metaStoreManager,
-        callCtx,
+        storageCredentialsVendor,
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -198,8 +177,7 @@ public class StorageCredentialCacheTest {
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
     storageCredentialCache.getOrGenerateSubScopeCreds(
-        metaStoreManager,
-        callCtx,
+        storageCredentialsVendor,
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -208,8 +186,7 @@ public class StorageCredentialCacheTest {
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
     storageCredentialCache.getOrGenerateSubScopeCreds(
-        metaStoreManager,
-        callCtx,
+        storageCredentialsVendor,
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -220,14 +197,10 @@ public class StorageCredentialCacheTest {
 
   @Test
   public void testCacheGenerateNewEntries() {
-    storageCredentialCache = newStorageCredentialCache();
     List<ScopedCredentialsResult> mockedScopedCreds =
         getFakeScopedCreds(3, /* expireSoon= */ false);
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -241,8 +214,7 @@ public class StorageCredentialCacheTest {
     // different catalog will generate new cache entries
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           entity,
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -259,8 +231,7 @@ public class StorageCredentialCacheTest {
       PolarisBaseEntity updateEntity =
           new 
PolarisBaseEntity.Builder(entity).internalPropertiesAsMap(internalMap).build();
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           PolarisEntity.of(updateEntity),
           /* allowedListAction= */ true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -271,8 +242,7 @@ public class StorageCredentialCacheTest {
     // allowedListAction changed to different value FALSE, will generate new 
entry
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           entity,
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -283,8 +253,7 @@ public class StorageCredentialCacheTest {
     // different allowedWriteLocations, will generate new entry
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           entity,
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -300,8 +269,7 @@ public class StorageCredentialCacheTest {
       PolarisBaseEntity updateEntity =
           new 
PolarisBaseEntity.Builder(entity).internalPropertiesAsMap(internalMap).build();
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           PolarisEntity.of(updateEntity),
           /* allowedListAction= */ false,
           Set.of("s3://differentbucket/path", "s3://bucket2/path"),
@@ -313,15 +281,11 @@ public class StorageCredentialCacheTest {
 
   @Test
   public void testCacheNotAffectedBy() {
-    storageCredentialCache = newStorageCredentialCache();
     List<ScopedCredentialsResult> mockedScopedCreds =
         getFakeScopedCreds(3, /* expireSoon= */ false);
 
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -333,8 +297,7 @@ public class StorageCredentialCacheTest {
     List<PolarisEntity> entityList = getPolarisEntities();
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           entity,
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -346,8 +309,7 @@ public class StorageCredentialCacheTest {
     // entity ID does not affect the cache
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).id(1234).build()),
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -359,8 +321,7 @@ public class StorageCredentialCacheTest {
     // other property changes does not affect the cache
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
@@ -371,8 +332,7 @@ public class StorageCredentialCacheTest {
     // order of the allowedReadLocations does not affect the cache
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
@@ -384,8 +344,7 @@ public class StorageCredentialCacheTest {
     // order of the allowedWriteLocations does not affect the cache
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
-          metaStoreManager,
-          callCtx,
+          storageCredentialsVendor,
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
@@ -456,7 +415,6 @@ public class StorageCredentialCacheTest {
 
   @Test
   public void testExtraProperties() {
-    storageCredentialCache = newStorageCredentialCache();
     ScopedCredentialsResult properties =
         new ScopedCredentialsResult(
             StorageAccessConfig.builder()
@@ -465,10 +423,7 @@ public class StorageCredentialCacheTest {
                 .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS, "true")
                 .build());
     Mockito.when(
-            metaStoreManager.getSubscopedCredsForEntity(
-                Mockito.any(),
-                Mockito.anyLong(),
-                Mockito.anyLong(),
+            storageCredentialsVendor.getSubscopedCredsForEntity(
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
@@ -479,8 +434,7 @@ public class StorageCredentialCacheTest {
 
     StorageAccessConfig config =
         storageCredentialCache.getOrGenerateSubScopeCreds(
-            metaStoreManager,
-            callCtx,
+            storageCredentialsVendor,
             entityList.get(0),
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 89624418a..e8f5402b1 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -1085,7 +1085,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         realmConfig.getConfig(FeatureConfiguration.OPTIMIZED_SIBLING_CHECK);
     if (useOptimizedSiblingCheck) {
       Optional<Optional<String>> directSiblingCheckResult =
-          
getMetaStoreManager().hasOverlappingSiblings(callContext.getPolarisCallContext(),
 entity);
+          
getMetaStoreManager().hasOverlappingSiblings(getCurrentPolarisContext(), 
entity);
       if (directSiblingCheckResult.isPresent()) {
         if (directSiblingCheckResult.get().isPresent()) {
           throw new org.apache.iceberg.exceptions.ForbiddenException(
@@ -2080,12 +2080,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       Set<PolarisStorageActions> storageActions) {
     StorageAccessConfig storageAccessConfig =
         storageAccessConfigProvider.getStorageAccessConfig(
-            callContext,
-            identifier,
-            readLocations,
-            storageActions,
-            Optional.empty(),
-            resolvedStorageEntity);
+            identifier, readLocations, storageActions, Optional.empty(), 
resolvedStorageEntity);
     // Reload fileIO based on table specific context
     FileIO fileIO = fileIOFactory.loadFileIO(storageAccessConfig, 
ioImplClassName, tableProperties);
     // ensure the new fileIO is closed when the catalog is closed
@@ -2101,11 +2096,6 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     return metaStoreManager;
   }
 
-  @VisibleForTesting
-  public void setFileIOFactory(FileIOFactory newFactory) {
-    this.fileIOFactory = newFactory;
-  }
-
   @VisibleForTesting
   long getCatalogId() {
     // TODO: Properly handle initialization
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 1de129cac..d2e48d2cc 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -812,7 +812,6 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
 
       StorageAccessConfig storageAccessConfig =
           storageAccessConfigProvider.getStorageAccessConfig(
-              callContext,
               tableIdentifier,
               tableLocations,
               actions,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
index c132322f5..39b2f9d5f 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
@@ -21,7 +21,7 @@ package org.apache.polaris.service.catalog.io;
 import com.google.common.annotations.VisibleForTesting;
 import io.smallrye.common.annotation.Identifier;
 import jakarta.annotation.Nonnull;
-import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.RequestScoped;
 import jakarta.inject.Inject;
 import java.util.HashMap;
 import java.util.Map;
@@ -36,7 +36,7 @@ import org.apache.polaris.core.storage.StorageAccessConfig;
  * <p>This class acts as a translation layer between Polaris properties and 
the properties required
  * by Iceberg's {@link FileIO}.
  */
-@ApplicationScoped
+@RequestScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
 
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
index b9bfbf97e..50a9c6883 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java
@@ -19,7 +19,7 @@
 package org.apache.polaris.service.catalog.io;
 
 import jakarta.annotation.Nonnull;
-import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.RequestScoped;
 import java.util.Map;
 import org.apache.iceberg.io.FileIO;
 import org.apache.polaris.core.storage.StorageAccessConfig;
@@ -27,7 +27,7 @@ import org.apache.polaris.core.storage.StorageAccessConfig;
 /**
  * Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3.
  *
- * <p>Implementations are available via CDI as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ * <p>Implementations are available via CDI as {@link RequestScoped 
@RequestScoped} beans.
  */
 public interface FileIOFactory {
 
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
index 7d5a112bb..5dc657a60 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
@@ -19,18 +19,9 @@
 package org.apache.polaris.service.catalog.io;
 
 import java.util.Optional;
-import java.util.Set;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.polaris.core.config.FeatureConfiguration;
-import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
-import org.apache.polaris.core.storage.PolarisCredentialVendor;
-import org.apache.polaris.core.storage.PolarisStorageActions;
-import org.apache.polaris.core.storage.StorageAccessConfig;
-import org.apache.polaris.core.storage.cache.StorageCredentialCache;
-import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,68 +52,4 @@ public class FileIOUtil {
             .findFirst();
     return storageInfoEntity;
   }
-
-  /**
-   * Refreshes or generates subscoped creds for accessing table storage based 
on the params.
-   *
-   * <p>Use cases:
-   *
-   * <ul>
-   *   <li>In {@link IcebergCatalog}, subscoped credentials are generated or 
refreshed when the
-   *       client sends a loadTable request to vend credentials.
-   *   <li>In {@link DefaultFileIOFactory}, subscoped credentials are obtained 
to access the storage
-   *       and read/write metadata JSON files.
-   * </ul>
-   */
-  public static StorageAccessConfig refreshAccessConfig(
-      CallContext callContext,
-      StorageCredentialCache storageCredentialCache,
-      PolarisCredentialVendor credentialVendor,
-      TableIdentifier tableIdentifier,
-      Set<String> tableLocations,
-      Set<PolarisStorageActions> storageActions,
-      PolarisEntity entity,
-      Optional<String> refreshCredentialsEndpoint) {
-
-    boolean skipCredentialSubscopingIndirection =
-        callContext
-            .getRealmConfig()
-            
.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION);
-    if (skipCredentialSubscopingIndirection) {
-      LOGGER
-          .atDebug()
-          .addKeyValue("tableIdentifier", tableIdentifier)
-          .log("Skipping generation of subscoped creds for table");
-      return StorageAccessConfig.builder().build();
-    }
-
-    boolean allowList =
-        storageActions.contains(PolarisStorageActions.LIST)
-            || storageActions.contains(PolarisStorageActions.ALL);
-    Set<String> writeLocations =
-        storageActions.contains(PolarisStorageActions.WRITE)
-                || storageActions.contains(PolarisStorageActions.DELETE)
-                || storageActions.contains(PolarisStorageActions.ALL)
-            ? tableLocations
-            : Set.of();
-    StorageAccessConfig storageAccessConfig =
-        storageCredentialCache.getOrGenerateSubScopeCreds(
-            credentialVendor,
-            callContext.getPolarisCallContext(),
-            entity,
-            allowList,
-            tableLocations,
-            writeLocations,
-            refreshCredentialsEndpoint);
-    LOGGER
-        .atDebug()
-        .addKeyValue("tableIdentifier", tableIdentifier)
-        .addKeyValue("credentialKeys", 
storageAccessConfig.credentials().keySet())
-        .addKeyValue("extraProperties", storageAccessConfig.extraProperties())
-        .log("Loaded scoped credentials for table");
-    if (storageAccessConfig.credentials().isEmpty()) {
-      LOGGER.debug("No credentials found for table");
-    }
-    return storageAccessConfig;
-  }
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
index 80e62856a..d6316c6e7 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
@@ -20,17 +20,17 @@
 package org.apache.polaris.service.catalog.io;
 
 import jakarta.annotation.Nonnull;
-import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.RequestScoped;
 import jakarta.inject.Inject;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.entity.PolarisEntity;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
 import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.core.storage.StorageAccessConfig;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,26 +42,25 @@ import org.slf4j.LoggerFactory;
  * <p>This provider decouples credential vending from catalog implementations, 
and should be the
  * primary entrypoint to get sub-scoped credentials for accessing table data.
  */
-@ApplicationScoped
+@RequestScoped
 public class StorageAccessConfigProvider {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageAccessConfigProvider.class);
 
   private final StorageCredentialCache storageCredentialCache;
-  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final StorageCredentialsVendor storageCredentialsVendor;
 
   @Inject
   public StorageAccessConfigProvider(
       StorageCredentialCache storageCredentialCache,
-      MetaStoreManagerFactory metaStoreManagerFactory) {
+      StorageCredentialsVendor storageCredentialsVendor) {
     this.storageCredentialCache = storageCredentialCache;
-    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.storageCredentialsVendor = storageCredentialsVendor;
   }
 
   /**
    * Vends credentials for accessing table storage at explicit locations.
    *
-   * @param callContext the call context containing realm, principal, and 
security context
    * @param tableIdentifier the table identifier, used for logging and refresh 
endpoint construction
    * @param tableLocations set of storage location URIs to scope credentials to
    * @param storageActions the storage operations (READ, WRITE, LIST, DELETE) 
to scope credentials
@@ -72,7 +71,6 @@ public class StorageAccessConfigProvider {
    *     config found
    */
   public StorageAccessConfig getStorageAccessConfig(
-      @Nonnull CallContext callContext,
       @Nonnull TableIdentifier tableIdentifier,
       @Nonnull Set<String> tableLocations,
       @Nonnull Set<PolarisStorageActions> storageActions,
@@ -91,14 +89,47 @@ public class StorageAccessConfigProvider {
           .log("Table entity has no storage configuration in its hierarchy");
       return 
StorageAccessConfig.builder().supportsCredentialVending(false).build();
     }
-    return FileIOUtil.refreshAccessConfig(
-        callContext,
-        storageCredentialCache,
-        
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext()),
-        tableIdentifier,
-        tableLocations,
-        storageActions,
-        storageInfo.get(),
-        refreshCredentialsEndpoint);
+    PolarisEntity storageInfoEntity = storageInfo.get();
+
+    boolean skipCredentialSubscopingIndirection =
+        storageCredentialsVendor
+            .getRealmConfig()
+            
.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION);
+    if (skipCredentialSubscopingIndirection) {
+      LOGGER
+          .atDebug()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return StorageAccessConfig.builder().build();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    StorageAccessConfig accessConfig =
+        storageCredentialCache.getOrGenerateSubScopeCreds(
+            storageCredentialsVendor,
+            storageInfoEntity,
+            allowList,
+            tableLocations,
+            writeLocations,
+            refreshCredentialsEndpoint);
+
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", accessConfig.credentials().keySet())
+        .addKeyValue("extraProperties", accessConfig.extraProperties())
+        .log("Loaded scoped credentials for table");
+    if (accessConfig.credentials().isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return accessConfig;
   }
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
index bd03c022b..13768f2ba 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
@@ -59,6 +59,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver;
 import org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.core.storage.cache.StorageCredentialCacheConfig;
 import org.apache.polaris.service.auth.AuthenticationConfiguration;
@@ -220,6 +221,7 @@ public class ServiceProducers {
   }
 
   @Produces
+  @RequestScoped
   public FileIOFactory fileIOFactory(
       FileIOConfiguration config, @Any Instance<FileIOFactory> 
fileIOFactories) {
     return fileIOFactories.select(Identifier.Literal.of(config.type())).get();
@@ -246,6 +248,13 @@ public class ServiceProducers {
     return metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
   }
 
+  @Produces
+  @RequestScoped
+  public StorageCredentialsVendor storageCredentialsVendor(
+      PolarisMetaStoreManager metaStoreManager, CallContext callContext) {
+    return new StorageCredentialsVendor(metaStoreManager, callContext);
+  }
+
   @Produces
   public UserSecretsManagerFactory userSecretsManagerFactory(
       SecretsManagerConfiguration config,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
index fdb4ef5e0..67dcacb9d 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
@@ -53,7 +53,7 @@ public class BatchFileCleanupTaskHandler extends 
FileCleanupTaskHandler {
     BatchFileCleanupTask cleanupTask = 
task.readData(BatchFileCleanupTask.class);
     TableIdentifier tableId = cleanupTask.tableId();
     List<String> batchFiles = cleanupTask.batchFiles();
-    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, 
callContext)) {
+    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId)) {
       List<String> validFiles =
           batchFiles.stream().filter(file -> TaskUtils.exists(file, 
authorizedFileIO)).toList();
       if (validFiles.isEmpty()) {
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
index f71adc2bc..7d9bc095f 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
@@ -61,7 +61,7 @@ public class ManifestFileCleanupTaskHandler extends 
FileCleanupTaskHandler {
   public boolean handleTask(TaskEntity task, CallContext callContext) {
     ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
     TableIdentifier tableId = cleanupTask.tableId();
-    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, 
callContext)) {
+    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId)) {
       ManifestFile manifestFile = 
TaskUtils.decodeManifestFileData(cleanupTask.manifestFileData());
       return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
     }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
index 777d9bc8d..54976dead 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
@@ -93,8 +93,7 @@ public class TableCleanupTaskHandler implements TaskHandler {
     // It's likely the cleanupTask has already been completed, but wasn't 
dropped successfully.
     // Log a
     // warning and move on
-    try (FileIO fileIO =
-        fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), 
callContext)) {
+    try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, 
tableEntity.getTableIdentifier())) {
       if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
         LOGGER
             .atWarn()
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
index b4c31d692..cddb67147 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
@@ -18,7 +18,7 @@
  */
 package org.apache.polaris.service.task;
 
-import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.RequestScoped;
 import jakarta.inject.Inject;
 import java.util.HashMap;
 import java.util.List;
@@ -28,7 +28,6 @@ import java.util.Set;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
-import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisTaskConstants;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
@@ -38,7 +37,7 @@ import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
 
-@ApplicationScoped
+@RequestScoped
 public class TaskFileIOSupplier {
   private final FileIOFactory fileIOFactory;
   private final StorageAccessConfigProvider accessConfigProvider;
@@ -50,8 +49,7 @@ public class TaskFileIOSupplier {
     this.accessConfigProvider = storageAccessConfigProvider;
   }
 
-  public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext 
callContext) {
-
+  public FileIO apply(TaskEntity task, TableIdentifier identifier) {
     Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
     Map<String, String> properties = new HashMap<>(internalProperties);
 
@@ -64,7 +62,7 @@ public class TaskFileIOSupplier {
         new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity));
     StorageAccessConfig storageAccessConfig =
         accessConfigProvider.getStorageAccessConfig(
-            callContext, identifier, locations, storageActions, 
Optional.empty(), resolvedPath);
+            identifier, locations, storageActions, Optional.empty(), 
resolvedPath);
 
     String ioImpl =
         properties.getOrDefault(
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java
index 5cda9c798..c2d0e997b 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java
@@ -60,6 +60,7 @@ import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration;
 import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
@@ -156,8 +157,10 @@ public abstract class 
AbstractPolarisGenericTableCatalogTest {
             metaStoreManagerFactory.getOrCreateSession(realmContext),
             configurationStore);
     realmConfig = polarisContext.getRealmConfig();
+    StorageCredentialsVendor storageCredentialsVendor =
+        new StorageCredentialsVendor(metaStoreManager, polarisContext);
     storageAccessConfigProvider =
-        new StorageAccessConfigProvider(storageCredentialCache, 
metaStoreManagerFactory);
+        new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
 
     PrincipalEntity rootPrincipal =
         metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index 3e7cc1985..e9a0b2e77 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -130,6 +130,7 @@ import 
org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
 import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration;
 import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
@@ -290,8 +291,11 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
             metaStoreManagerFactory.getOrCreateSession(realmContext),
             configurationStore);
     realmConfig = polarisContext.getRealmConfig();
+    StorageCredentialsVendor storageCredentialsVendor =
+        new StorageCredentialsVendor(metaStoreManager, polarisContext);
     storageAccessConfigProvider =
-        new StorageAccessConfigProvider(storageCredentialCache, 
metaStoreManagerFactory);
+        new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
+
     EntityCache entityCache = createEntityCache(diagServices, realmConfig, 
metaStoreManager);
     resolverFactory =
         (principal, referenceCatalogName) ->
@@ -1915,7 +1919,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
         .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
SESSION_TOKEN);
     FileIO fileIO =
         new TaskFileIOSupplier(new DefaultFileIOFactory(), 
storageAccessConfigProvider)
-            .apply(taskEntity, TABLE, polarisContext);
+            .apply(taskEntity, TABLE);
     
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
     Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
         .isInstanceOf(InMemoryFileIO.class);
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
index f8468d6bf..7dd45d180 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java
@@ -53,6 +53,7 @@ import 
org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
 import org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
@@ -162,8 +163,11 @@ public abstract class AbstractIcebergCatalogViewTest 
extends ViewCatalogTests<Ic
             metaStoreManagerFactory.getOrCreateSession(realmContext),
             configurationStore);
     realmConfig = polarisContext.getRealmConfig();
+    StorageCredentialsVendor storageCredentialsVendor =
+        new StorageCredentialsVendor(metaStoreManager, polarisContext);
     storageAccessConfigProvider =
-        new StorageAccessConfigProvider(storageCredentialCache, 
metaStoreManagerFactory);
+        new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
+
     PrincipalEntity rootPrincipal =
         metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
     PolarisPrincipal authenticatedRoot = PolarisPrincipal.of(rootPrincipal, 
Set.of());
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index 673e0f292..09268cf2c 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -171,7 +171,7 @@ public class FileIOFactoryTest {
     FileIO fileIO =
         new TaskFileIOSupplier(
                 testServices.fileIOFactory(), 
testServices.storageAccessConfigProvider())
-            .apply(taskEntity, TABLE, callContext);
+            .apply(taskEntity, TABLE);
     
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
     Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
         .isInstanceOf(InMemoryFileIO.class);
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java
index 3ee2faf34..eba010e11 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java
@@ -72,6 +72,7 @@ import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration;
 import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
@@ -177,8 +178,10 @@ public abstract class AbstractPolicyCatalogTest {
             metaStoreManagerFactory.getOrCreateSession(realmContext),
             configurationStore);
     realmConfig = polarisContext.getRealmConfig();
+    StorageCredentialsVendor storageCredentialsVendor =
+        new StorageCredentialsVendor(metaStoreManager, polarisContext);
     storageAccessConfigProvider =
-        new StorageAccessConfigProvider(storageCredentialCache, 
metaStoreManagerFactory);
+        new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
 
     PrincipalEntity rootPrincipal =
         metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
diff --git 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 6041d2c48..4c910ffbd 100644
--- 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -60,6 +60,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver;
 import org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
+import org.apache.polaris.core.storage.StorageCredentialsVendor;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.core.storage.cache.StorageCredentialCacheConfig;
 import org.apache.polaris.service.admin.PolarisAdminService;
@@ -273,8 +274,10 @@ public record TestServices(
       PolarisCredentialManager credentialManager =
           new DefaultPolarisCredentialManager(realmContext, 
mockCredentialVendors);
 
+      StorageCredentialsVendor storageCredentialsVendor =
+          new StorageCredentialsVendor(metaStoreManager, callContext);
       StorageAccessConfigProvider storageAccessConfigProvider =
-          new StorageAccessConfigProvider(storageCredentialCache, 
metaStoreManagerFactory);
+          new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
       FileIOFactory fileIOFactory = fileIOFactorySupplier.get();
 
       TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);

Reply via email to