This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e6d92937 add refresh credentials property to loadTableResult (#2341)
9e6d92937 is described below

commit 9e6d92937f5f11e9879b1cc9a7a80b75cf341e31
Author: Jason <jason...@gmail.com>
AuthorDate: Thu Aug 28 05:03:39 2025 +0300

    add refresh credentials property to loadTableResult (#2341)
    
    * add refresh credentials property to loadTableResult
    
    * IcebergCatalogAdapterTest: Added test to ensure refresh credentials 
endpoint is included
    
    * delegate refresh credential endpoint configuration to storage integration
    
    * GCP: Add refresh credential properties
---
 CHANGELOG.md                                       |  5 ++
 .../AtomicOperationMetaStoreManager.java           |  6 +-
 .../TransactionWorkspaceMetaStoreManager.java      |  6 +-
 .../TransactionalMetaStoreManagerImpl.java         |  6 +-
 .../polaris/core/rest/PolarisResourcePaths.java    | 11 ++++
 .../core/storage/PolarisCredentialVendor.java      |  8 ++-
 .../core/storage/PolarisStorageIntegration.java    |  8 ++-
 .../core/storage/StorageAccessProperty.java        | 21 +++++++
 .../aws/AwsCredentialsStorageIntegration.java      |  8 ++-
 .../azure/AzureCredentialsStorageIntegration.java  | 17 ++++-
 .../core/storage/cache/StorageCredentialCache.java |  9 ++-
 .../storage/cache/StorageCredentialCacheKey.java   | 10 ++-
 .../gcp/GcpCredentialsStorageIntegration.java      | 10 ++-
 .../storage/InMemoryStorageIntegrationTest.java    |  4 +-
 .../AzureCredentialsStorageIntegrationTest.java    | 22 ++++++-
 .../storage/cache/StorageCredentialCacheTest.java  | 73 +++++++++++++++-------
 .../aws/AwsCredentialsStorageIntegrationTest.java  | 51 ++++++++++++---
 .../AzureCredentialStorageIntegrationTest.java     |  4 +-
 .../gcp/GcpCredentialsStorageIntegrationTest.java  | 20 +++++-
 .../service/it/RestCatalogMinIOSpecialIT.java      |  7 ++-
 .../service/catalog/iceberg/IcebergCatalog.java    |  6 +-
 .../catalog/iceberg/IcebergCatalogAdapter.java     | 31 +++++++--
 .../catalog/iceberg/IcebergCatalogHandler.java     | 42 ++++++++++---
 .../iceberg/SupportsCredentialDelegation.java      |  4 +-
 .../service/catalog/io/DefaultFileIOFactory.java   |  3 +-
 .../polaris/service/catalog/io/FileIOUtil.java     |  6 +-
 .../PolarisStorageIntegrationProviderImpl.java     |  3 +-
 .../catalog/AbstractIcebergCatalogTest.java        |  4 +-
 .../catalog/IcebergCatalogHandlerAuthzTest.java    | 29 +++++----
 29 files changed, 341 insertions(+), 93 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index cb4333995..f0ffb54e0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -83,6 +83,11 @@ at locations that better optimize for object storage.
 
 - Introduced bootstrap command options to specify custom schema files for 
database initialization.
 
+- Added refresh credentials endpoint configuration to LoadTableResponse for 
AWS, Azure, and GCP. Enabling
+automatic storage credential refresh per table on the client side. Java client 
version >= 1.8.0 is required.
+The endpoint path is always returned when using vended credentials, but 
clients must enable the 
+refresh-credentials flag for the desired storage provider.
+
 ### Changes
 
 - Polaris Management API clients must be prepared to deal with new attributes 
in `AwsStorageConfigInfo` objects.
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index 947d41511..f6f04143f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -1582,7 +1582,8 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
       PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
 
     // get meta store session we should be using
     BasePersistence ms = callCtx.getMetaStore();
@@ -1622,7 +1623,8 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
               callCtx.getRealmConfig(),
               allowListOperation,
               allowedReadLocations,
-              allowedWriteLocations);
+              allowedWriteLocations,
+              refreshCredentialsEndpoint);
       return new ScopedCredentialsResult(accessConfig);
     } catch (Exception ex) {
       return new ScopedCredentialsResult(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 2671ad98b..3d9f3c052 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -327,7 +327,8 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
       PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     return delegate.getSubscopedCredsForEntity(
         callCtx,
         catalogId,
@@ -335,7 +336,8 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
         entityType,
         allowListOperation,
         allowedReadLocations,
-        allowedWriteLocations);
+        allowedWriteLocations,
+        refreshCredentialsEndpoint);
   }
 
   @Override
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index c3e1a9fac..97af650b0 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -2040,7 +2040,8 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
       PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
 
     // get meta store session we should be using
     TransactionalPersistence ms = ((TransactionalPersistence) 
callCtx.getMetaStore());
@@ -2075,7 +2076,8 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
               callCtx.getRealmConfig(),
               allowListOperation,
               allowedReadLocations,
-              allowedWriteLocations);
+              allowedWriteLocations,
+              refreshCredentialsEndpoint);
       return new ScopedCredentialsResult(accessConfig);
     } catch (Exception ex) {
       return new ScopedCredentialsResult(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
index 8a30d7962..16eea08da 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
@@ -57,6 +57,17 @@ public class PolarisResourcePaths {
         "polaris", "v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), 
"generic-tables");
   }
 
+  public String credentialsPath(TableIdentifier ident) {
+    return SLASH.join(
+        "v1",
+        prefix,
+        "namespaces",
+        RESTUtil.encodeNamespace(ident.namespace()),
+        "tables",
+        RESTUtil.encodeString(ident.name()),
+        "credentials");
+  }
+
   public String genericTable(TableIdentifier ident) {
     return SLASH.join(
         "polaris",
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
index 04022d233..d64e9ad88 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.core.storage;
 
 import jakarta.annotation.Nonnull;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.entity.PolarisEntityType;
@@ -37,6 +38,10 @@ public interface PolarisCredentialVendor {
    *     allowedWriteLocations
    * @param allowedReadLocations a set of allowed to read locations
    * @param allowedWriteLocations a set of allowed to write locations
+   * @param refreshCredentialsEndpoint an optional endpoint to use for 
refreshing credentials. If
+   *     supported by the storage type it will be returned to the client in 
the appropriate
+   *     properties. The endpoint may be relative to the base URI and the 
client is responsible for
+   *     handling the relative path
    * @return an enum map containing the scoped credentials
    */
   @Nonnull
@@ -47,5 +52,6 @@ public interface PolarisCredentialVendor {
       PolarisEntityType entityType,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations);
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint);
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
index c98982091..1828d01c8 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
@@ -20,6 +20,7 @@ package org.apache.polaris.core.storage;
 
 import jakarta.annotation.Nonnull;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.config.RealmConfig;
 
@@ -55,13 +56,18 @@ public abstract class PolarisStorageIntegration<T extends 
PolarisStorageConfigur
    *     locations
    * @param allowedReadLocations a set of allowed to read locations
    * @param allowedWriteLocations a set of allowed to write locations
+   * @param refreshCredentialsEndpoint an optional endpoint to use for 
refreshing credentials. If
+   *     supported by the storage type it will be returned to the client in 
the appropriate
+   *     properties. The endpoint may be relative to the base URI and the 
client is responsible for
+   *     handling the relative path
    * @return An enum map including the scoped credentials
    */
   public abstract AccessConfig getSubscopedCreds(
       @Nonnull RealmConfig realmConfig,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations);
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint);
 
   /**
    * Validate access for the provided operation actions and locations.
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java
index 33526d2e2..faa29c31e 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java
@@ -18,6 +18,9 @@
  */
 package org.apache.polaris.core.storage;
 
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.gcp.GCPProperties;
+
 /**
  * A subset of Iceberg catalog properties recognized by Polaris.
  *
@@ -39,6 +42,12 @@ public enum StorageAccessProperty {
       Boolean.class, "s3.path-style-access", "whether to use S3 path style 
access", false),
   CLIENT_REGION(
       String.class, "client.region", "region to configure client for making 
requests to AWS"),
+  AWS_REFRESH_CREDENTIALS_ENDPOINT(
+      String.class,
+      AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
+      "the endpoint to load vended credentials for a table from the catalog",
+      false,
+      false),
 
   GCS_ACCESS_TOKEN(String.class, "gcs.oauth2.token", "the gcs scoped access 
token"),
   GCS_ACCESS_TOKEN_EXPIRES_AT(
@@ -47,11 +56,23 @@ public enum StorageAccessProperty {
       "the time the gcs access token expires, in milliseconds",
       true,
       true),
+  GCS_REFRESH_CREDENTIALS_ENDPOINT(
+      String.class,
+      GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
+      "the endpoint to load vended credentials for a table from the catalog",
+      false,
+      false),
 
   // Currently not using ACCESS TOKEN as the ResolvingFileIO is using 
ADLSFileIO for azure case and
   // it expects for SAS
   AZURE_ACCESS_TOKEN(String.class, "", "the azure scoped access token"),
   AZURE_SAS_TOKEN(String.class, "adls.sas-token.", "an azure shared access 
signature token"),
+  AZURE_REFRESH_CREDENTIALS_ENDPOINT(
+      String.class,
+      "adls.refresh-credentials-endpoint",
+      "the endpoint to load vended credentials for a table from the catalog",
+      false,
+      false),
   EXPIRATION_TIME(
       Long.class,
       "expiration-time",
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
index 616fb1f4d..3e93ba7b4 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
@@ -74,7 +74,8 @@ public class AwsCredentialsStorageIntegration
       @Nonnull RealmConfig realmConfig,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     int storageCredentialDurationSeconds =
         realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS);
     AwsStorageConfigurationInfo storageConfig = config();
@@ -120,6 +121,11 @@ public class AwsCredentialsStorageIntegration
       accessConfig.put(StorageAccessProperty.CLIENT_REGION, region);
     }
 
+    refreshCredentialsEndpoint.ifPresent(
+        endpoint -> {
+          
accessConfig.put(StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT, 
endpoint);
+        });
+
     URI endpointUri = storageConfig.getEndpointUri();
     if (endpointUri != null) {
       accessConfig.put(StorageAccessProperty.AWS_ENDPOINT, 
endpointUri.toString());
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
index 50dd8c414..5b466b0c3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
@@ -46,6 +46,7 @@ import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.HashSet;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.AccessConfig;
@@ -76,7 +77,8 @@ public class AzureCredentialsStorageIntegration
       @Nonnull RealmConfig realmConfig,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     String loc =
         !allowedWriteLocations.isEmpty()
             ? allowedWriteLocations.stream().findAny().orElse(null)
@@ -169,15 +171,24 @@ public class AzureCredentialsStorageIntegration
           String.format("Endpoint %s not supported", location.getEndpoint()));
     }
 
-    return toAccessConfig(sasToken, storageDnsName, 
sanitizedEndTime.toInstant());
+    return toAccessConfig(
+        sasToken, storageDnsName, sanitizedEndTime.toInstant(), 
refreshCredentialsEndpoint);
   }
 
   @VisibleForTesting
-  static AccessConfig toAccessConfig(String sasToken, String storageDnsName, 
Instant expiresAt) {
+  static AccessConfig toAccessConfig(
+      String sasToken,
+      String storageDnsName,
+      Instant expiresAt,
+      Optional<String> refreshCredentialsEndpoint) {
     AccessConfig.Builder accessConfig = AccessConfig.builder();
     handleAzureCredential(accessConfig, sasToken, storageDnsName);
     accessConfig.put(
         StorageAccessProperty.EXPIRATION_TIME, 
String.valueOf(expiresAt.toEpochMilli()));
+    refreshCredentialsEndpoint.ifPresent(
+        endpoint -> {
+          
accessConfig.put(StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT, 
endpoint);
+        });
     return accessConfig.build();
   }
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
index d8d88edc6..d166ee4b1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
@@ -105,7 +105,8 @@ public class StorageCredentialCache {
       @Nonnull PolarisEntity polarisEntity,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     if (!isTypeSupported(polarisEntity.getType())) {
       callCtx
           .getDiagServices()
@@ -117,7 +118,8 @@ public class StorageCredentialCache {
             polarisEntity,
             allowListOperation,
             allowedReadLocations,
-            allowedWriteLocations);
+            allowedWriteLocations,
+            refreshCredentialsEndpoint);
     LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache");
     Function<StorageCredentialCacheKey, StorageCredentialCacheEntry> loader =
         k -> {
@@ -130,7 +132,8 @@ public class StorageCredentialCache {
                   polarisEntity.getType(),
                   k.allowedListAction(),
                   k.allowedReadLocations(),
-                  k.allowedWriteLocations());
+                  k.allowedWriteLocations(),
+                  k.refreshCredentialsEndpoint());
           if (scopedCredentialsResult.isSuccess()) {
             long maxCacheDurationMs = 
maxCacheDurationMs(callCtx.getRealmConfig());
             return new StorageCredentialCacheEntry(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
index 79eba7d1d..8b9d0542d 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.core.storage.cache;
 
 import jakarta.annotation.Nullable;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
@@ -47,12 +48,16 @@ public interface StorageCredentialCacheKey {
   @Value.Parameter(order = 6)
   Set<String> allowedWriteLocations();
 
+  @Value.Parameter(order = 7)
+  Optional<String> refreshCredentialsEndpoint();
+
   static StorageCredentialCacheKey of(
       String realmId,
       PolarisEntity entity,
       boolean allowedListAction,
       Set<String> allowedReadLocations,
-      Set<String> allowedWriteLocations) {
+      Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     String storageConfigSerializedStr =
         entity
             .getInternalPropertiesAsMap()
@@ -63,6 +68,7 @@ public interface StorageCredentialCacheKey {
         storageConfigSerializedStr,
         allowedListAction,
         allowedReadLocations,
-        allowedWriteLocations);
+        allowedWriteLocations,
+        refreshCredentialsEndpoint);
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
index 0120df2b1..c0568cc9b 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Stream;
 import org.apache.polaris.core.config.RealmConfig;
@@ -75,7 +76,8 @@ public class GcpCredentialsStorageIntegration
       @Nonnull RealmConfig realmConfig,
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
-      @Nonnull Set<String> allowedWriteLocations) {
+      @Nonnull Set<String> allowedWriteLocations,
+      Optional<String> refreshCredentialsEndpoint) {
     try {
       sourceCredentials.refresh();
     } catch (IOException e) {
@@ -112,6 +114,12 @@ public class GcpCredentialsStorageIntegration
     accessConfig.put(
         StorageAccessProperty.GCS_ACCESS_TOKEN_EXPIRES_AT,
         String.valueOf(token.getExpirationTime().getTime()));
+
+    refreshCredentialsEndpoint.ifPresent(
+        endpoint -> {
+          
accessConfig.put(StorageAccessProperty.GCS_REFRESH_CREDENTIALS_ENDPOINT, 
endpoint);
+        });
+
     return accessConfig.build();
   }
 
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
index fa7777814..9ba5271ab 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.polaris.core.storage;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.config.RealmConfig;
@@ -197,7 +198,8 @@ class InMemoryStorageIntegrationTest {
         @Nonnull RealmConfig realmConfig,
         boolean allowListOperation,
         @Nonnull Set<String> allowedReadLocations,
-        @Nonnull Set<String> allowedWriteLocations) {
+        @Nonnull Set<String> allowedWriteLocations,
+        Optional<String> refreshCredentialsEndpoint) {
       return null;
     }
   }
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java
index 89b60dba5..d613e5154 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java
@@ -22,7 +22,9 @@ package org.apache.polaris.core.storage.azure;
 import static 
org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration.toAccessConfig;
 
 import java.time.Instant;
+import java.util.Optional;
 import org.apache.polaris.core.storage.AccessConfig;
+import org.apache.polaris.core.storage.StorageAccessProperty;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -32,20 +34,34 @@ public class AzureCredentialsStorageIntegrationTest {
   public void testAzureCredentialFormatting() {
     Instant expiresAt = Instant.ofEpochMilli(Long.MAX_VALUE);
 
-    AccessConfig noSuffixResult = toAccessConfig("sasToken", "some_account", 
expiresAt);
+    AccessConfig noSuffixResult =
+        toAccessConfig("sasToken", "some_account", expiresAt, 
Optional.empty());
     Assertions.assertThat(noSuffixResult.credentials()).hasSize(2);
     
Assertions.assertThat(noSuffixResult.credentials()).containsKey("adls.sas-token.some_account");
+    Assertions.assertThat(noSuffixResult.credentials())
+        .doesNotContainKey(
+            
StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName());
 
     AccessConfig adlsSuffixResult =
-        toAccessConfig("sasToken", "some_account." + 
AzureLocation.ADLS_ENDPOINT, expiresAt);
+        toAccessConfig(
+            "sasToken",
+            "some_account." + AzureLocation.ADLS_ENDPOINT,
+            expiresAt,
+            Optional.of("endpoint/credentials"));
     Assertions.assertThat(adlsSuffixResult.credentials()).hasSize(3);
     Assertions.assertThat(adlsSuffixResult.credentials())
         .containsKey("adls.sas-token.some_account");
     Assertions.assertThat(adlsSuffixResult.credentials())
         .containsKey("adls.sas-token.some_account." + 
AzureLocation.ADLS_ENDPOINT);
 
+    Assertions.assertThat(adlsSuffixResult.extraProperties())
+        .containsEntry(
+            
StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(),
+            "endpoint/credentials");
+
     AccessConfig blobSuffixResult =
-        toAccessConfig("sasToken", "some_account." + 
AzureLocation.BLOB_ENDPOINT, expiresAt);
+        toAccessConfig(
+            "sasToken", "some_account." + AzureLocation.BLOB_ENDPOINT, 
expiresAt, Optional.empty());
     Assertions.assertThat(blobSuffixResult.credentials()).hasSize(3);
     Assertions.assertThat(blobSuffixResult.credentials())
         .containsKey("adls.sas-token.some_account");
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
index a8e97133b..becc220a6 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
 import org.apache.polaris.core.PolarisCallContext;
@@ -89,7 +90,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(badResult);
     PolarisEntity polarisEntity =
         new PolarisEntity(
@@ -103,7 +105,8 @@ public class StorageCredentialCacheTest {
                     polarisEntity,
                     true,
                     Set.of("s3://bucket1/path"),
-                    Set.of("s3://bucket3/path")))
+                    Set.of("s3://bucket3/path"),
+                    Optional.empty()))
         .isInstanceOf(UnprocessableEntityException.class)
         .hasMessage("Failed to get subscoped credentials: extra_error_info");
   }
@@ -121,7 +124,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(1));
@@ -137,7 +141,8 @@ public class StorageCredentialCacheTest {
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
-        Set.of("s3://bucket3/path", "s3://bucket4/path"));
+        Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
 
     // subscope for the same entity and same allowed locations, will hit the 
cache
@@ -147,7 +152,8 @@ public class StorageCredentialCacheTest {
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
-        Set.of("s3://bucket3/path", "s3://bucket4/path"));
+        Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
   }
 
@@ -164,7 +170,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(2));
@@ -178,7 +185,8 @@ public class StorageCredentialCacheTest {
             polarisEntity,
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
-            Set.of("s3://bucket/path"));
+            Set.of("s3://bucket/path"),
+            Optional.empty());
 
     // the entry will be evicted immediately because the token is expired
     storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -187,7 +195,8 @@ public class StorageCredentialCacheTest {
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
-        Set.of("s3://bucket/path"));
+        Set.of("s3://bucket/path"),
+        Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
     storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -196,7 +205,8 @@ public class StorageCredentialCacheTest {
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
-        Set.of("s3://bucket/path"));
+        Set.of("s3://bucket/path"),
+        Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
     storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -205,7 +215,8 @@ public class StorageCredentialCacheTest {
         polarisEntity,
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
-        Set.of("s3://bucket/path"));
+        Set.of("s3://bucket/path"),
+        Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
   }
 
@@ -222,7 +233,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(2));
@@ -236,7 +248,8 @@ public class StorageCredentialCacheTest {
           entity,
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket/path"));
+          Set.of("s3://bucket/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
     // update the entity's storage config, since StorageConfig changed, cache 
will generate new
@@ -253,7 +266,8 @@ public class StorageCredentialCacheTest {
           PolarisEntity.of(updateEntity),
           /* allowedListAction= */ true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket/path"));
+          Set.of("s3://bucket/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
     // allowedListAction changed to different value FALSE, will generate new 
entry
@@ -264,7 +278,8 @@ public class StorageCredentialCacheTest {
           entity,
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket/path"));
+          Set.of("s3://bucket/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
     // different allowedWriteLocations, will generate new entry
@@ -275,7 +290,8 @@ public class StorageCredentialCacheTest {
           entity,
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://differentbucket/path"));
+          Set.of("s3://differentbucket/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
     // different allowedReadLocations, will generate new try
@@ -291,7 +307,8 @@ public class StorageCredentialCacheTest {
           PolarisEntity.of(updateEntity),
           /* allowedListAction= */ false,
           Set.of("s3://differentbucket/path", "s3://bucket2/path"),
-          Set.of("s3://bucket/path"));
+          Set.of("s3://bucket/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
   }
@@ -310,7 +327,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(2));
@@ -322,7 +340,8 @@ public class StorageCredentialCacheTest {
           entity,
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket3/path", "s3://bucket4/path"));
+          Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          Optional.empty());
     }
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
 
@@ -334,7 +353,8 @@ public class StorageCredentialCacheTest {
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).id(1234).build()),
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket3/path", "s3://bucket4/path"));
+          Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
 
@@ -346,7 +366,8 @@ public class StorageCredentialCacheTest {
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
-          Set.of("s3://bucket3/path", "s3://bucket4/path"));
+          Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
     // order of the allowedReadLocations does not affect the cache
@@ -357,7 +378,8 @@ public class StorageCredentialCacheTest {
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
-          Set.of("s3://bucket3/path", "s3://bucket4/path"));
+          Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
 
@@ -369,7 +391,8 @@ public class StorageCredentialCacheTest {
           new PolarisEntity(new 
PolarisBaseEntity.Builder(entity).entityVersion(5).build()),
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
-          Set.of("s3://bucket4/path", "s3://bucket3/path"));
+          Set.of("s3://bucket4/path", "s3://bucket3/path"),
+          Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
   }
@@ -451,7 +474,8 @@ public class StorageCredentialCacheTest {
                 Mockito.any(),
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
-                Mockito.anySet()))
+                Mockito.anySet(),
+                Mockito.any()))
         .thenReturn(properties);
     List<PolarisEntity> entityList = getPolarisEntities();
 
@@ -462,7 +486,8 @@ public class StorageCredentialCacheTest {
             entityList.get(0),
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
-            Set.of("s3://bucket3/path", "s3://bucket4/path"));
+            Set.of("s3://bucket3/path", "s3://bucket4/path"),
+            Optional.empty());
     Assertions.assertThat(config.credentials())
         .containsExactly(Map.entry("s3.secret-access-key", 
"super-secret-123"));
     Assertions.assertThat(config.extraProperties())
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
index 10ba4b908..7b4b50dec 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import jakarta.annotation.Nonnull;
 import java.time.Instant;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.storage.AccessConfig;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
@@ -95,7 +96,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 EMPTY_REALM_CONFIG,
                 true,
                 Set.of(warehouseDir + "/namespace/table"),
-                Set.of(warehouseDir + "/namespace/table"));
+                Set.of(warehouseDir + "/namespace/table"),
+                Optional.of("/namespace/table/credentials"));
     assertThat(accessConfig.credentials())
         .isNotEmpty()
         .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -104,6 +106,10 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
         .containsEntry(
             
StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS.getPropertyName(),
             String.valueOf(EXPIRE_TIME.toEpochMilli()));
+    assertThat(accessConfig.extraProperties())
+        .containsEntry(
+            
StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(),
+            "/namespace/table/credentials");
   }
 
   @ParameterizedTest
@@ -242,7 +248,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                             EMPTY_REALM_CONFIG,
                             true,
                             Set.of(s3Path(bucket, firstPath), s3Path(bucket, 
secondPath)),
-                            Set.of(s3Path(bucket, firstPath))))
+                            Set.of(s3Path(bucket, firstPath)),
+                            null))
             .isInstanceOf(IllegalArgumentException.class);
         break;
       case AWS_PARTITION:
@@ -260,7 +267,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     EMPTY_REALM_CONFIG,
                     true,
                     Set.of(s3Path(bucket, firstPath), s3Path(bucket, 
secondPath)),
-                    Set.of(s3Path(bucket, firstPath)));
+                    Set.of(s3Path(bucket, firstPath)),
+                    Optional.empty());
         assertThat(accessConfig.credentials())
             .isNotEmpty()
             .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -360,7 +368,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 EMPTY_REALM_CONFIG,
                 false, /* allowList = false*/
                 Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
-                Set.of(s3Path(bucket, firstPath)));
+                Set.of(s3Path(bucket, firstPath)),
+                Optional.empty());
     assertThat(accessConfig.credentials())
         .isNotEmpty()
         .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -454,7 +463,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 EMPTY_REALM_CONFIG,
                 true, /* allowList = true */
                 Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
-                Set.of());
+                Set.of(),
+                Optional.empty());
     assertThat(accessConfig.credentials())
         .isNotEmpty()
         .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -516,7 +526,12 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     .region("us-east-2")
                     .build(),
                 stsClient)
-            .getSubscopedCreds(EMPTY_REALM_CONFIG, true, /* allowList = true 
*/ Set.of(), Set.of());
+            .getSubscopedCreds(
+                EMPTY_REALM_CONFIG,
+                true, /* allowList = true */
+                Set.of(),
+                Set.of(),
+                Optional.empty());
     assertThat(accessConfig.credentials())
         .isNotEmpty()
         .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -554,7 +569,11 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                                 .build(),
                             stsClient)
                         .getSubscopedCreds(
-                            EMPTY_REALM_CONFIG, true, /* allowList = true */ 
Set.of(), Set.of()))
+                            EMPTY_REALM_CONFIG,
+                            true, /* allowList = true */
+                            Set.of(),
+                            Set.of(),
+                            Optional.empty()))
             .isInstanceOf(IllegalArgumentException.class);
         break;
       case AWS_PARTITION:
@@ -569,7 +588,11 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                         .build(),
                     stsClient)
                 .getSubscopedCreds(
-                    EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), 
Set.of());
+                    EMPTY_REALM_CONFIG,
+                    true, /* allowList = true */
+                    Set.of(),
+                    Set.of(),
+                    Optional.empty());
         assertThat(accessConfig.credentials())
             .isNotEmpty()
             
.containsEntry(StorageAccessProperty.CLIENT_REGION.getPropertyName(), 
clientRegion);
@@ -604,7 +627,11 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                         .build(),
                     stsClient)
                 .getSubscopedCreds(
-                    EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), 
Set.of());
+                    EMPTY_REALM_CONFIG,
+                    true, /* allowList = true */
+                    Set.of(),
+                    Set.of(),
+                    Optional.empty());
         assertThat(accessConfig.credentials())
             .isNotEmpty()
             
.doesNotContainKey(StorageAccessProperty.CLIENT_REGION.getPropertyName());
@@ -621,7 +648,11 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                                 .build(),
                             stsClient)
                         .getSubscopedCreds(
-                            EMPTY_REALM_CONFIG, true, /* allowList = true */ 
Set.of(), Set.of()))
+                            EMPTY_REALM_CONFIG,
+                            true, /* allowList = true */
+                            Set.of(),
+                            Set.of(),
+                            Optional.empty()))
             .isInstanceOf(IllegalArgumentException.class);
         break;
       default:
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
index d1783baed..96e441000 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Stream;
 import org.apache.polaris.core.storage.AccessConfig;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
@@ -352,7 +353,8 @@ public class AzureCredentialStorageIntegrationTest extends 
BaseStorageIntegratio
         EMPTY_REALM_CONFIG,
         allowListAction,
         new HashSet<>(allowedReadLoc),
-        new HashSet<>(allowedWriteLoc));
+        new HashSet<>(allowedWriteLoc),
+        Optional.empty());
   }
 
   private BlobContainerClient createContainerClient(
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
index da890627e..f1a7afc63 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.storage.AccessConfig;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
@@ -59,6 +60,8 @@ class GcpCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
   private final String gcsServiceKeyJsonFileLocation =
       System.getenv("GOOGLE_APPLICATION_CREDENTIALS");
 
+  private static final String REFRESH_ENDPOINT = "get/credentials";
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testSubscope(boolean allowedListAction) throws Exception {
@@ -170,7 +173,8 @@ class GcpCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
         EMPTY_REALM_CONFIG,
         allowListAction,
         new HashSet<>(allowedReadLoc),
-        new HashSet<>(allowedWriteLoc));
+        new HashSet<>(allowedWriteLoc),
+        Optional.of(REFRESH_ENDPOINT));
   }
 
   @Test
@@ -295,6 +299,20 @@ class GcpCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
     return true;
   }
 
+  @Test
+  public void testRefreshCredentialsEndpointIsReturned() throws IOException {
+    Assumptions.assumeThat(gcsServiceKeyJsonFileLocation)
+        .describedAs("Environment variable GOOGLE_APPLICATION_CREDENTIALS not 
exits")
+        .isNotNull()
+        .isNotEmpty();
+
+    AccessConfig accessConfig =
+        subscopedCredsForOperations(
+            List.of("gs://bucket1/path/to/data"), 
List.of("gs://bucket1/path/to/data"), true);
+    
assertThat(accessConfig.get(StorageAccessProperty.GCS_REFRESH_CREDENTIALS_ENDPOINT))
+        .isEqualTo(REFRESH_ENDPOINT);
+  }
+
   private boolean isNotNull(JsonNode node) {
     return node != null && !node.isNull();
   }
diff --git 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
index 6a93da886..83ddd0c57 100644
--- 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
+++ 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -266,7 +267,11 @@ public class RestCatalogMinIOSpecialIT {
 
     LoadTableResponse loadTableResponse =
         catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL");
-    assertThat(loadTableResponse.config()).containsKey("s3.endpoint");
+    assertThat(loadTableResponse.config())
+        .containsKey("s3.endpoint")
+        .containsEntry(
+            AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
+            "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
 
     restCatalog.dropTable(id);
     assertThat(restCatalog.tableExists(id)).isFalse();
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 438cf9922..5da1eeb12 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -830,7 +830,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
   public AccessConfig getAccessConfig(
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
-      Set<PolarisStorageActions> storageActions) {
+      Set<PolarisStorageActions> storageActions,
+      Optional<String> refreshCredentialsEndpoint) {
     Optional<PolarisEntity> storageInfo = findStorageInfo(tableIdentifier);
     if (storageInfo.isEmpty()) {
       LOGGER
@@ -846,7 +847,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
         tableIdentifier,
         StorageUtil.getLocationsAllowedToBeAccessed(tableMetadata),
         storageActions,
-        storageInfo.get());
+        storageInfo.get(),
+        refreshCredentialsEndpoint);
   }
 
   private String buildPrefixedLocation(TableIdentifier tableIdentifier) {
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 76401582a..860476cf8 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -75,6 +75,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver;
 import org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 import org.apache.polaris.service.catalog.AccessDelegationMode;
 import org.apache.polaris.service.catalog.CatalogPrefixParser;
@@ -359,12 +360,18 @@ public class IcebergCatalogAdapter
         securityContext,
         prefix,
         catalog -> {
+          Optional<String> refreshCredentialsEndpoint =
+              getRefreshCredentialsEndpoint(
+                  delegationModes,
+                  prefix,
+                  TableIdentifier.of(namespace, createTableRequest.name()));
           if (createTableRequest.stageCreate()) {
             if (delegationModes.isEmpty()) {
               return Response.ok(catalog.createTableStaged(ns, 
createTableRequest)).build();
             } else {
               return Response.ok(
-                      catalog.createTableStagedWithWriteDelegation(ns, 
createTableRequest))
+                      catalog.createTableStagedWithWriteDelegation(
+                          ns, createTableRequest, refreshCredentialsEndpoint))
                   .build();
             }
           } else if (delegationModes.isEmpty()) {
@@ -374,7 +381,8 @@ public class IcebergCatalogAdapter
                 .build();
           } else {
             LoadTableResponse response =
-                catalog.createTableDirectWithWriteDelegation(ns, 
createTableRequest);
+                catalog.createTableDirectWithWriteDelegation(
+                    ns, createTableRequest, refreshCredentialsEndpoint);
             return tryInsertETagHeader(
                     Response.ok(response), response, namespace, 
createTableRequest.name())
                 .build();
@@ -430,9 +438,12 @@ public class IcebergCatalogAdapter
                     .loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots)
                     .orElseThrow(() -> new 
WebApplicationException(Response.Status.NOT_MODIFIED));
           } else {
+            Optional<String> refreshCredentialsEndpoint =
+                getRefreshCredentialsEndpoint(delegationModes, prefix, 
tableIdentifier);
             response =
                 catalog
-                    .loadTableWithAccessDelegationIfStale(tableIdentifier, 
ifNoneMatch, snapshots)
+                    .loadTableWithAccessDelegationIfStale(
+                        tableIdentifier, ifNoneMatch, snapshots, 
refreshCredentialsEndpoint)
                     .orElseThrow(() -> new 
WebApplicationException(Response.Status.NOT_MODIFIED));
           }
 
@@ -440,6 +451,15 @@ public class IcebergCatalogAdapter
         });
   }
 
+  private static Optional<String> getRefreshCredentialsEndpoint(
+      EnumSet<AccessDelegationMode> delegationModes,
+      String prefix,
+      TableIdentifier tableIdentifier) {
+    return delegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)
+        ? Optional.of(new 
PolarisResourcePaths(prefix).credentialsPath(tableIdentifier))
+        : Optional.empty();
+  }
+
   @Override
   public Response tableExists(
       String prefix,
@@ -599,7 +619,10 @@ public class IcebergCatalogAdapter
         prefix,
         catalog -> {
           LoadTableResponse loadTableResponse =
-              catalog.loadTableWithAccessDelegation(tableIdentifier, "all");
+              catalog.loadTableWithAccessDelegation(
+                  tableIdentifier,
+                  "all",
+                  Optional.of(new 
PolarisResourcePaths(prefix).credentialsPath(tableIdentifier)));
           return Response.ok(
                   ImmutableLoadCredentialsResponse.builder()
                       .credentials(loadTableResponse.credentials())
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 17cdd7af3..57b8a990a 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -393,7 +393,9 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse createTableDirectWithWriteDelegation(
-      Namespace namespace, CreateTableRequest request) {
+      Namespace namespace,
+      CreateTableRequest request,
+      Optional<String> refreshCredentialsEndpoint) {
     PolarisAuthorizableOperation op =
         PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION;
     authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
@@ -432,7 +434,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
                   PolarisStorageActions.READ,
                   PolarisStorageActions.WRITE,
                   PolarisStorageActions.LIST),
-              SNAPSHOTS_ALL)
+              SNAPSHOTS_ALL,
+              refreshCredentialsEndpoint)
           .build();
     } else if (table instanceof BaseMetadataTable) {
       // metadata tables are loaded on the client side, return 
NoSuchTableException for now
@@ -500,7 +503,9 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   }
 
   public LoadTableResponse createTableStagedWithWriteDelegation(
-      Namespace namespace, CreateTableRequest request) {
+      Namespace namespace,
+      CreateTableRequest request,
+      Optional<String> refreshCredentialsEndpoint) {
     PolarisAuthorizableOperation op =
         PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION;
     authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
@@ -514,7 +519,11 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     TableMetadata metadata = stageTableCreateHelper(namespace, request);
 
     return buildLoadTableResponseWithDelegationCredentials(
-            ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
+            ident,
+            metadata,
+            Set.of(PolarisStorageActions.ALL),
+            SNAPSHOTS_ALL,
+            refreshCredentialsEndpoint)
         .build();
   }
 
@@ -623,8 +632,12 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   }
 
   public LoadTableResponse loadTableWithAccessDelegation(
-      TableIdentifier tableIdentifier, String snapshots) {
-    return loadTableWithAccessDelegationIfStale(tableIdentifier, null, 
snapshots).get();
+      TableIdentifier tableIdentifier,
+      String snapshots,
+      Optional<String> refreshCredentialsEndpoint) {
+    return loadTableWithAccessDelegationIfStale(
+            tableIdentifier, null, snapshots, refreshCredentialsEndpoint)
+        .get();
   }
 
   /**
@@ -638,7 +651,10 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
    *     load table response, otherwise
    */
   public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
-      TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String 
snapshots) {
+      TableIdentifier tableIdentifier,
+      IfNoneMatch ifNoneMatch,
+      String snapshots,
+      Optional<String> refreshCredentialsEndpoint) {
     // Here we have a single method that falls through multiple candidate
     // PolarisAuthorizableOperations because instead of identifying the 
desired operation up-front
     // and
@@ -708,7 +724,11 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       TableMetadata tableMetadata = baseTable.operations().current();
       return Optional.of(
           buildLoadTableResponseWithDelegationCredentials(
-                  tableIdentifier, tableMetadata, actionsRequested, snapshots)
+                  tableIdentifier,
+                  tableMetadata,
+                  actionsRequested,
+                  snapshots,
+                  refreshCredentialsEndpoint)
               .build());
     } else if (table instanceof BaseMetadataTable) {
       // metadata tables are loaded on the client side, return 
NoSuchTableException for now
@@ -722,7 +742,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
       Set<PolarisStorageActions> actions,
-      String snapshots) {
+      String snapshots,
+      Optional<String> refreshCredentialsEndpoint) {
     LoadTableResponse.Builder responseBuilder =
         LoadTableResponse.builder().withTableMetadata(tableMetadata);
     if (baseCatalog instanceof SupportsCredentialDelegation 
credentialDelegation) {
@@ -732,7 +753,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .addKeyValue("tableLocation", tableMetadata.location())
           .log("Fetching client credentials for table");
       AccessConfig accessConfig =
-          credentialDelegation.getAccessConfig(tableIdentifier, tableMetadata, 
actions);
+          credentialDelegation.getAccessConfig(
+              tableIdentifier, tableMetadata, actions, 
refreshCredentialsEndpoint);
       Map<String, String> credentialConfig = accessConfig.credentials();
       responseBuilder.addAllConfig(credentialConfig);
       responseBuilder.addAllConfig(accessConfig.extraProperties());
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java
index 21ec380eb..b85973ed8 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java
@@ -18,6 +18,7 @@
  */
 package org.apache.polaris.service.catalog.iceberg;
 
+import java.util.Optional;
 import java.util.Set;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -35,5 +36,6 @@ public interface SupportsCredentialDelegation {
   AccessConfig getAccessConfig(
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
-      Set<PolarisStorageActions> storageActions);
+      Set<PolarisStorageActions> storageActions,
+      Optional<String> refreshCredentialsEndpoint);
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
index d2c73e268..81f6e7c8f 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java
@@ -91,7 +91,8 @@ public class DefaultFileIOFactory implements FileIOFactory {
                     identifier,
                     tableLocations,
                     storageActions,
-                    storageInfo));
+                    storageInfo,
+                    Optional.empty()));
 
     // Update the FileIO with the subscoped credentials
     // Update with properties in case there are table-level overrides the 
credentials should
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
index c5ef12d78..f4a6320d6 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java
@@ -81,7 +81,8 @@ public class FileIOUtil {
       TableIdentifier tableIdentifier,
       Set<String> tableLocations,
       Set<PolarisStorageActions> storageActions,
-      PolarisEntity entity) {
+      PolarisEntity entity,
+      Optional<String> refreshCredentialsEndpoint) {
 
     boolean skipCredentialSubscopingIndirection =
         callContext
@@ -111,7 +112,8 @@ public class FileIOUtil {
             entity,
             allowList,
             tableLocations,
-            writeLocations);
+            writeLocations,
+            refreshCredentialsEndpoint);
     LOGGER
         .atDebug()
         .addKeyValue("tableIdentifier", tableIdentifier)
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
index e07bdd082..e04a9525b 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
@@ -113,7 +113,8 @@ public class PolarisStorageIntegrationProviderImpl 
implements PolarisStorageInte
                   @Nonnull RealmConfig realmConfig,
                   boolean allowListOperation,
                   @Nonnull Set<String> allowedReadLocations,
-                  @Nonnull Set<String> allowedWriteLocations) {
+                  @Nonnull Set<String> allowedWriteLocations,
+                  Optional<String> refreshCredentialsEndpoint) {
                 return AccessConfig.builder().build();
               }
 
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
index 696bca432..d66d25cd5 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
@@ -46,6 +46,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
@@ -1828,7 +1829,8 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
                 taskEntity.getType(),
                 true,
                 Set.of(tableMetadata.location()),
-                Set.of(tableMetadata.location()))
+                Set.of(tableMetadata.location()),
+                Optional.empty())
             .getAccessConfig()
             .credentials();
     Assertions.assertThat(credentials)
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
index a3ab18e3f..a8090f038 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -26,6 +26,7 @@ import jakarta.ws.rs.core.SecurityContext;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
@@ -616,7 +617,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
         () -> {
           newWrapper(Set.of(PRINCIPAL_ROLE1))
-              .createTableDirectWithWriteDelegation(NS2, 
createDirectWithWriteDelegationRequest);
+              .createTableDirectWithWriteDelegation(
+                  NS2, createDirectWithWriteDelegationRequest, 
Optional.empty());
         },
         () -> {
           newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithPurge(newtable);
@@ -646,7 +648,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_LIST),
         () -> {
           newWrapper(Set.of(PRINCIPAL_ROLE1))
-              .createTableDirectWithWriteDelegation(NS2, 
createDirectWithWriteDelegationRequest);
+              .createTableDirectWithWriteDelegation(
+                  NS2, createDirectWithWriteDelegationRequest, 
Optional.empty());
         });
   }
 
@@ -719,7 +722,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
         () -> {
           newWrapper(Set.of(PRINCIPAL_ROLE1))
-              .createTableStagedWithWriteDelegation(NS2, 
createStagedWithWriteDelegationRequest);
+              .createTableStagedWithWriteDelegation(
+                  NS2, createStagedWithWriteDelegationRequest, 
Optional.empty());
         },
         // createTableStagedWithWriteDelegation doesn't actually commit any 
metadata
         null,
@@ -748,7 +752,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_LIST),
         () -> {
           newWrapper(Set.of(PRINCIPAL_ROLE1))
-              .createTableStagedWithWriteDelegation(NS2, 
createStagedWithWriteDelegationRequest);
+              .createTableStagedWithWriteDelegation(
+                  NS2, createStagedWithWriteDelegationRequest, 
Optional.empty());
         });
   }
 
@@ -892,7 +897,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_READ_DATA,
             PolarisPrivilege.TABLE_WRITE_DATA,
             PolarisPrivilege.CATALOG_MANAGE_CONTENT),
-        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"),
+        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", 
Optional.empty()),
         null /* cleanupAction */);
   }
 
@@ -908,7 +913,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_CREATE,
             PolarisPrivilege.TABLE_LIST,
             PolarisPrivilege.TABLE_DROP),
-        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
+        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", 
Optional.empty()));
   }
 
   @Test
@@ -921,7 +926,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_READ_DATA,
             PolarisPrivilege.TABLE_WRITE_DATA,
             PolarisPrivilege.CATALOG_MANAGE_CONTENT),
-        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"),
+        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", 
Optional.empty()),
         null /* cleanupAction */);
   }
 
@@ -937,7 +942,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.TABLE_CREATE,
             PolarisPrivilege.TABLE_LIST,
             PolarisPrivilege.TABLE_DROP),
-        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
+        () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", 
Optional.empty()));
   }
 
   @Test
@@ -950,7 +955,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         () ->
             newWrapper()
                 .loadTableWithAccessDelegationIfStale(
-                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
+                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", 
Optional.empty()),
         null /* cleanupAction */);
   }
 
@@ -969,7 +974,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         () ->
             newWrapper()
                 .loadTableWithAccessDelegationIfStale(
-                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
+                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", 
Optional.empty()));
   }
 
   @Test
@@ -985,7 +990,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         () ->
             newWrapper()
                 .loadTableWithAccessDelegationIfStale(
-                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
+                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", 
Optional.empty()),
         null /* cleanupAction */);
   }
 
@@ -1004,7 +1009,7 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         () ->
             newWrapper()
                 .loadTableWithAccessDelegationIfStale(
-                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
+                    TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", 
Optional.empty()));
   }
 
   @Test

Reply via email to