snazy commented on code in PR #724:
URL: https://github.com/apache/polaris/pull/724#discussion_r1924733131


##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;

Review Comment:
   unused?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,

Review Comment:
   "entity" ?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()

Review Comment:
   maybe add a note that this starts at the "leaf" (table) and walks "upwards" 
via namespaces to the "root"



##########
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java:
##########
@@ -1459,6 +1462,46 @@ public void testDropTableWithPurgeDisabled() {
         
.hasMessageContaining(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.key);
   }
 
+  @Test
+  public void testRefreshIOWithCredentialsAndInternalProperties() {

Review Comment:
   Not sure how this actually _refreshes_ credentials.
   
   Anyway, having this test is good. However, I'd prefer to also have a 
dedicated test class for `DefaultFileIOFactory` and do extensive testing there, 
also with multiple realms?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;

Review Comment:
   unused?
   



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */

Review Comment:
   Nit: Iceberg's not an "SDK"



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(
+      TableIdentifier tableIdentifier,
+      Set<PolarisStorageActions> storageActions,
+      Set<String> tableLocations,
+      PolarisEntity entity) {
+    Boolean skipCredentialSubscopingIndirection =
+        getBooleanContextConfiguration(
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =
+        entityManager
+            .getCredentialCache()
+            .getOrGenerateSubScopeCreds(
+                getCredentialVendor(),
+                metaStoreSession,
+                entity,
+                allowList,
+                tableLocations,
+                writeLocations);
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", credentialsMap.keySet())
+        .log("Loaded scoped credentials for table");
+    if (credentialsMap.isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return credentialsMap;
+  }
+
+  private PolarisCredentialVendor getCredentialVendor() {
+    return metaStoreManager;
+  }
+
+  private Boolean getBooleanContextConfiguration(String configKey, boolean 
defaultValue) {

Review Comment:
   Hm - why return a boxed primitive if there's a default value?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {

Review Comment:
   Better to make bean implementations package-private. There's no need to make 
them public.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(

Review Comment:
   Nit: `@Nonnull` is superfluous of Java `Optional*` types



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;

Review Comment:
   This should be removed. The bean is application scoped, but the realm is 
request-scoped



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(

Review Comment:
   Actually, since this PR changes FileIO/object-storage-credentials stuff, I 
think it would be good to separate and expose functionality from the interface 
(or a separate one?) to enable support for Iceberg's refresh-credentials via 
its REST API.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {

Review Comment:
   Actually all these parameters are per request/realm. Injecting those into an 
application-scoped bean either leads to build (Quarkus) or runtime (other CDI 
impls) issues or makes realms unusable.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);

Review Comment:
   This modifies an externally provided Map.
   You don't know whether the map is modifiable (runtime exceptions) or what 
the call site intents to do with the map after the call to this function 
(unexpected behavior in a different component) or whether the map is shared 
across threads/requests (concurrent modifications / runtime exceptions).
   It also leaks credentials to the call site, which should not be the case.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(
+      TableIdentifier tableIdentifier,
+      Set<PolarisStorageActions> storageActions,
+      Set<String> tableLocations,
+      PolarisEntity entity) {
+    Boolean skipCredentialSubscopingIndirection =
+        getBooleanContextConfiguration(
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =

Review Comment:
   absolute nitpick and totally my own personal preference: using Java's 
(new-ish) `var` keyword actually increases readability, assuming that the 
variable name is expressive.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,

Review Comment:
   Not related to this PR, but this is a real annoyance in Iceberg.
   There can be symbolic FileIO implementation "names" and class names - and 
neither practically works across all programming languages (Iceberg/Java has 
its special implementations, PyIceberg has its own, ...).



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(
+      TableIdentifier tableIdentifier,
+      Set<PolarisStorageActions> storageActions,
+      Set<String> tableLocations,
+      PolarisEntity entity) {
+    Boolean skipCredentialSubscopingIndirection =
+        getBooleanContextConfiguration(
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =
+        entityManager
+            .getCredentialCache()
+            .getOrGenerateSubScopeCreds(
+                getCredentialVendor(),
+                metaStoreSession,
+                entity,
+                allowList,
+                tableLocations,
+                writeLocations);
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", credentialsMap.keySet())
+        .log("Loaded scoped credentials for table");
+    if (credentialsMap.isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return credentialsMap;
+  }
+
+  private PolarisCredentialVendor getCredentialVendor() {
+    return metaStoreManager;

Review Comment:
   Better narrow the type and change the name of this field - and inline it's 
usage.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java:
##########
@@ -19,9 +19,21 @@
 package org.apache.polaris.service.catalog.io;
 
 import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 
 /** Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3. */

Review Comment:
   Can you add a comment here saying something like
   ```
    *
    * <p>Implementations are available via CDI as {@link ApplicationScoped 
@ApplicationScoped} beans.
   ```
   



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,

Review Comment:
   Anyways, maybe rename this parameter to something like 
"icebergFileIoIdentifier"



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(
+      TableIdentifier tableIdentifier,
+      Set<PolarisStorageActions> storageActions,
+      Set<String> tableLocations,
+      PolarisEntity entity) {
+    Boolean skipCredentialSubscopingIndirection =
+        getBooleanContextConfiguration(
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =
+        entityManager
+            .getCredentialCache()
+            .getOrGenerateSubScopeCreds(
+                getCredentialVendor(),
+                metaStoreSession,
+                entity,
+                allowList,
+                tableLocations,
+                writeLocations);
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", credentialsMap.keySet())
+        .log("Loaded scoped credentials for table");
+    if (credentialsMap.isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return credentialsMap;
+  }
+
+  private PolarisCredentialVendor getCredentialVendor() {
+    return metaStoreManager;
+  }
+
+  private Boolean getBooleanContextConfiguration(String configKey, boolean 
defaultValue) {

Review Comment:
   feels easier to inline the function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to