XJDKC commented on code in PR #724:
URL: https://github.com/apache/polaris/pull/724#discussion_r1925789588


##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -19,18 +19,157 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFileIOFactory.class);
+
+  private final RealmContext realmContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisMetaStoreSession metaStoreSession;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmContext realmContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisMetaStoreSession metaStoreSession,
+      PolarisConfigurationStore polarisConfigurationStore) {
+    this.realmContext = realmContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.metaStoreSession = metaStoreSession;
+    this.configurationStore = polarisConfigurationStore;
+  }
+
   @Override
   public FileIO loadFileIO(String impl, Map<String, String> properties) {
     return CatalogUtil.loadFileIO(impl, properties, new Configuration());
   }
+
+  @Override
+  public FileIO loadFileIO(
+      String impl,
+      TableIdentifier identifier,
+      Set<String> readLocations,
+      PolarisResolvedPathWrapper resolvedStorageEntity,
+      Map<String, String> tableProperties,
+      Set<PolarisStorageActions> storageActions) {
+    Optional<PolarisEntity> storageInfoEntity = 
findStorageInfoFromHierarchy(resolvedStorageEntity);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    refreshCredentials(identifier, storageActions, 
readLocations, storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file
+    // update with table properties in case there are table-level overrides
+    // the credentials should always override table-level properties, since
+    // storage configuration will be found at whatever entity defines it
+    tableProperties.putAll(credentialsMap);
+
+    // Propagate the internal properties to FileIO in case the FileIO 
implementation needs them
+    Map<String, String> internalProperties =
+        
storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).orElse(Map.of());
+    tableProperties.putAll(internalProperties);
+
+    FileIO fileIO = null;
+    fileIO = loadFileIO(impl, tableProperties);
+    return fileIO;
+  }
+
+  private static @Nonnull Optional<PolarisEntity> findStorageInfoFromHierarchy(
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+    Optional<PolarisEntity> storageInfoEntity =
+        resolvedStorageEntity.getRawFullPath().reversed().stream()
+            .filter(
+                e ->
+                    e.getInternalPropertiesAsMap()
+                        
.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .findFirst();
+    return storageInfoEntity;
+  }
+
+  private Map<String, String> refreshCredentials(

Review Comment:
   Create a separate util class so that both the `BasePolarisCatalog` and 
`FileIOFactory` can use the same methods.
   For `BasePolarisCatalog`, we need to generate/refresh subscoped creds when 
the client sends a loadTable request.
   For `DefaultFileIOFactory`, we need to get subscoped creds first so that we 
can access the storage and read/write metadata json files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to