dimas-b commented on code in PR #724:
URL: https://github.com/apache/polaris/pull/724#discussion_r1929397864


##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java:
##########
@@ -18,10 +18,50 @@
  */
 package org.apache.polaris.service.catalog.io;
 
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
 import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 
-/** Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3. */
+/**
+ * Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3.
+ *
+ * <p>Implementations are available via CDI as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ */
 public interface FileIOFactory {
-  FileIO loadFileIO(String impl, Map<String, String> properties);
+
+  /** This method is intended for use in tests only. */

Review Comment:
   ... but it is called from `BasePolarisCatalog`. I think we ought to refactor 
that call path to avoid having extension point APIs that have to provide 
different functionality in test contexts (can be done later).



##########
service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java:
##########
@@ -1586,27 +1525,22 @@ protected String viewName() {
     }
   }
 
-  private FileIO refreshIOWithCredentials(
+  private FileIO refreshIOForTableLike(

Review Comment:
   nit: why call it "refresh FileIO" it does not "refresh" any FileIO objects 
but makes a new one.



##########
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java:
##########
@@ -18,23 +18,32 @@
  */
 package org.apache.polaris.service.quarkus.catalog.io;
 
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.inject.Vetoed;
+import jakarta.inject.Inject;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Supplier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
 /**
  * A FileIOFactory that measures the number of bytes read, files written, and 
files deleted. It can
  * inject exceptions at various parts of the IO construction.
  */
 @Vetoed
-public class TestFileIOFactory extends DefaultFileIOFactory {
+public class TestFileIOFactory implements FileIOFactory {

Review Comment:
   nit: `TestFileIOFactory` -> `ObservingFileIOFactory` or 
`MeasuredFileIOFactory`...
   
   `TestFileIOFactory` is confusing wrt. `FileIOFactoryTest`.



##########
service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.when;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.GoogleCredentials;
+import jakarta.annotation.Nonnull;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.*;
+import org.apache.polaris.core.persistence.*;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
+import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
+import org.apache.polaris.service.config.DefaultConfigurationStore;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
+import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class FileIOFactoryTest {
+
+  public static final String TEST_ACCESS_KEY = "test_access_key";
+  public static final String SECRET_ACCESS_KEY = "secret_access_key";
+  public static final String SESSION_TOKEN = "session_token";
+
+  private RealmEntityManagerFactory realmEntityManagerFactory;
+  private PolarisStorageIntegrationProvider storageIntegrationProvider;
+  private MetaStoreManagerFactory metaStoreManagerFactory;
+  private PolarisDiagnostics polarisDiagnostics;
+  private PolarisEntityManager entityManager;
+  private PolarisMetaStoreManager metaStoreManager;
+  private PolarisMetaStoreSession metaStoreSession;
+  private PolarisConfigurationStore configurationStore;
+
+  private RealmId realmId;
+
+  @BeforeEach
+  public void before(TestInfo testInfo) {
+    String realmName =
+        "realm_%s_%s"
+            .formatted(
+                testInfo.getTestMethod().map(Method::getName).orElse("test"), 
System.nanoTime());
+    realmId = RealmId.newRealmId(realmName);
+    configurationStore = new DefaultConfigurationStore(Map.of());
+    polarisDiagnostics = Mockito.mock(PolarisDiagnostics.class);
+
+    StsClient stsClient = Mockito.mock(StsClient.class);
+    storageIntegrationProvider =
+        new PolarisStorageIntegrationProviderImpl(
+            () -> stsClient,
+            () -> GoogleCredentials.create(new AccessToken("abc", new Date())),
+            configurationStore);
+    metaStoreManagerFactory =
+        new InMemoryPolarisMetaStoreManagerFactory(
+            storageIntegrationProvider,
+            configurationStore,
+            polarisDiagnostics,
+            Clock.systemDefaultZone());
+    realmEntityManagerFactory =
+        new RealmEntityManagerFactory(metaStoreManagerFactory, 
polarisDiagnostics) {};
+    entityManager = 
realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+    metaStoreManager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+    metaStoreSession = 
metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+    // Mock get subscoped creds
+    when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
+        .thenReturn(
+            AssumeRoleResponse.builder()
+                .credentials(
+                    Credentials.builder()
+                        .accessKeyId(TEST_ACCESS_KEY)
+                        .secretAccessKey(SECRET_ACCESS_KEY)
+                        .sessionToken(SESSION_TOKEN)
+                        .build())
+                .build());
+  }
+
+  @AfterEach
+  public void after() {
+    metaStoreManager.purge(metaStoreSession);
+  }
+
+  @Test
+  public void testLoadFileIOForCatalog() {
+    String testProperty = "test.property";
+    FileIOFactory fileIOFactory =
+        new DefaultFileIOFactory(
+            realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore) {
+          @Override
+          FileIO loadFileIOInternal(
+              @Nonnull String ioImplClassName, @Nonnull Map<String, String> 
properties) {
+            org.assertj.core.api.Assertions.assertThat(properties)
+                .containsEntry(testProperty, "true");
+            return super.loadFileIOInternal(ioImplClassName, properties);
+          }
+        };
+    fileIOFactory.loadFileIO(realmId, InMemoryFileIO.class.getName(), 
Map.of(testProperty, "true"));
+  }
+
+  @Test
+  public void testLoadFileIOForTableLike() {
+    String storageLocation = "s3://my-bucket/path/to/data";
+    AwsStorageConfigurationInfo storageConfig =
+        new AwsStorageConfigurationInfo(
+            PolarisStorageConfigurationInfo.StorageType.S3,
+            List.of(storageLocation, "s3://externally-owned-bucket"),
+            "arn:aws:iam::012345678901:role/jdoe",
+            null,
+            null);
+    ResolvedPolarisEntity catalogEntity =
+        createResolvedEntity(
+            10,
+            PolarisEntityType.CATALOG,
+            PolarisEntitySubType.NULL_SUBTYPE,
+            10,
+            0,
+            "my-catalog",
+            Map.of(
+                PolarisEntityConstants.getStorageConfigInfoPropertyName(),
+                storageConfig.serialize()));
+    ResolvedPolarisEntity tableEntity =
+        createResolvedEntity(
+            10,
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.TABLE,
+            11,
+            10,
+            "my-table",
+            Map.of());
+    metaStoreManager.createCatalog(metaStoreSession, 
catalogEntity.getEntity(), List.of());
+    PolarisResolvedPathWrapper resolvedPathWrapper =
+        new PolarisResolvedPathWrapper(List.of(catalogEntity, tableEntity));
+
+    String testProperty = "test.property";
+    FileIOFactory fileIOFactory =
+        new DefaultFileIOFactory(
+            realmEntityManagerFactory, metaStoreManagerFactory, 
configurationStore) {
+          @Override
+          FileIO loadFileIOInternal(
+              @Nonnull String ioImplClassName, @Nonnull Map<String, String> 
properties) {
+            // properties should contain credentials
+            org.assertj.core.api.Assertions.assertThat(properties)
+                .containsEntry(S3FileIOProperties.ACCESS_KEY_ID, 
TEST_ACCESS_KEY)
+                .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY, 
SECRET_ACCESS_KEY)
+                .containsEntry(S3FileIOProperties.SESSION_TOKEN, SESSION_TOKEN)
+                .containsEntry(testProperty, "true");
+            return super.loadFileIOInternal(ioImplClassName, properties);
+          }
+        };
+    fileIOFactory
+        .loadFileIO(
+            realmId,
+            InMemoryFileIO.class.getName(),
+            Map.of(testProperty, "true"),
+            TableIdentifier.of("my-ns", "my-table"),
+            Set.of(storageLocation),
+            Set.of(PolarisStorageActions.READ),
+            resolvedPathWrapper)
+        .close();

Review Comment:
   nit: `assertThatCode(...).doesNotThrowAnyException()`



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java:
##########
@@ -18,10 +18,50 @@
  */
 package org.apache.polaris.service.catalog.io;
 
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
 import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
 
-/** Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3. */
+/**
+ * Interface for providing a way to construct FileIO objects, such as for 
reading/writing S3.
+ *
+ * <p>Implementations are available via CDI as {@link ApplicationScoped 
@ApplicationScoped} beans.

Review Comment:
   nit: why mention `@ApplicationScoped` on the interface that does not carry 
this annotation? IMHO, this is not an API concern, but an integration/build 
concern.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java:
##########
@@ -18,19 +18,112 @@
  */
 package org.apache.polaris.service.catalog.io;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
-/** A simple FileIOFactory implementation that defers all the work to the 
Iceberg SDK */
+/**
+ * A default FileIO factory implementation for creating Iceberg {@link FileIO} 
instances with
+ * contextual table-level properties.
+ *
+ * <p>This class acts as a translation layer between Polaris properties and 
the properties required
+ * by Iceberg's {@link FileIO}. For example, it evaluates storage actions and 
retrieves subscoped
+ * credentials to initialize a {@link FileIO} instance with the most limited 
permissions necessary.
+ */
 @ApplicationScoped
 @Identifier("default")
 public class DefaultFileIOFactory implements FileIOFactory {
+
+  private final RealmEntityManagerFactory realmEntityManagerFactory;
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public DefaultFileIOFactory(
+      RealmEntityManagerFactory realmEntityManagerFactory,
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore configurationStore) {
+    this.realmEntityManagerFactory = realmEntityManagerFactory;
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.configurationStore = configurationStore;
+  }
+
+  @Override
+  public FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties) {
+    return loadFileIOInternal(ioImplClassName, properties);
+  }
+
   @Override
-  public FileIO loadFileIO(String impl, Map<String, String> properties) {
-    return CatalogUtil.loadFileIO(impl, properties, new Configuration());
+  public FileIO loadFileIO(
+      @Nonnull RealmId realmId,
+      @Nonnull String ioImplClassName,
+      @Nonnull Map<String, String> properties,
+      @Nonnull TableIdentifier identifier,
+      @Nonnull Set<String> tableLocations,
+      @Nonnull Set<PolarisStorageActions> storageActions,
+      @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
+    PolarisEntityManager entityManager =
+        realmEntityManagerFactory.getOrCreateEntityManager(realmId);
+    PolarisCredentialVendor credentialVendor =
+        metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId);
+    PolarisMetaStoreSession metaStoreSession =
+        metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get();
+
+    // Get subcoped creds
+    properties = new HashMap<>(properties);
+    Optional<PolarisEntity> storageInfoEntity =
+        FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath);
+    Map<String, String> credentialsMap =
+        storageInfoEntity
+            .map(
+                storageInfo ->
+                    FileIOUtil.refreshCredentials(
+                        realmId,
+                        entityManager,
+                        credentialVendor,
+                        metaStoreSession,
+                        configurationStore,
+                        identifier,
+                        tableLocations,
+                        storageActions,
+                        storageInfo))
+            .orElse(Map.of());
+
+    // Update the FileIO before we write the new metadata file

Review Comment:
   What are we updating here?.. and why mention "metadata file"?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java:
##########
@@ -19,19 +19,63 @@
 package org.apache.polaris.service.catalog.io;
 
 import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.Nonnull;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
 import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogUtil;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
 
 /** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
 @ApplicationScoped
 @Identifier("wasb")
 public class WasbTranslatingFileIOFactory implements FileIOFactory {
+
+  private final FileIOFactory defaultFileIOFactory;
+
+  @Inject
+  public WasbTranslatingFileIOFactory(
+      RealmEntityManagerFactory realmEntityManagerFactory,
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore configurationStore) {
+    defaultFileIOFactory =
+        new DefaultFileIOFactory(

Review Comment:
   How is the behaviour of `WasbTranslatingFileIOFactory` different from 
`DefaultFileIOFactory`? Why do we need `WasbTranslatingFileIOFactory`?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.io;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmId;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.storage.PolarisCredentialVendor;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.catalog.BasePolarisCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileIOUtil {

Review Comment:
   nit: for a `public` util class with only static methods, it is best to add a 
`private` constructor to prevent accidental instantiation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to