XJDKC commented on code in PR #724:
URL: https://github.com/apache/polaris/pull/724#discussion_r1925777263


##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(
+      TableIdentifier tableIdentifier,
+      Set<PolarisStorageActions> storageActions,
+      Set<String> tableLocations,
+      PolarisEntity entity) {
+    Boolean skipCredentialSubscopingIndirection =
+        getBooleanContextConfiguration(
+            PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key,
+            
PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue);
+    if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) {
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Skipping generation of subscoped creds for table");
+      return Map.of();
+    }
+
+    boolean allowList =
+        storageActions.contains(PolarisStorageActions.LIST)
+            || storageActions.contains(PolarisStorageActions.ALL);
+    Set<String> writeLocations =
+        storageActions.contains(PolarisStorageActions.WRITE)
+                || storageActions.contains(PolarisStorageActions.DELETE)
+                || storageActions.contains(PolarisStorageActions.ALL)
+            ? tableLocations
+            : Set.of();
+    Map<String, String> credentialsMap =
+        entityManager
+            .getCredentialCache()
+            .getOrGenerateSubScopeCreds(
+                getCredentialVendor(),
+                metaStoreSession,
+                entity,
+                allowList,
+                tableLocations,
+                writeLocations);
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .addKeyValue("credentialKeys", credentialsMap.keySet())
+        .log("Loaded scoped credentials for table");
+    if (credentialsMap.isEmpty()) {
+      LOGGER.debug("No credentials found for table");
+    }
+    return credentialsMap;
+  }
+
+  private PolarisCredentialVendor getCredentialVendor() {
+    return metaStoreManager;
+  }
+
+  private Boolean getBooleanContextConfiguration(String configKey, boolean 
defaultValue) {

Review Comment:
   Inline the function instead!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to