This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new ed029d1cd [Catalog Federation] Block credential vending for remote 
tables outside allowed location list (#2791)
ed029d1cd is described below

commit ed029d1cd1e3b6856530fcc95edc5d7ec2781eb2
Author: Honah (Jonas) J. <[email protected]>
AuthorDate: Mon Oct 13 13:07:28 2025 -0500

    [Catalog Federation] Block credential vending for remote tables outside 
allowed location list (#2791)
---
 .../it/test/CatalogFederationIntegrationTest.java  | 85 +++++++++++++++++++---
 .../service/catalog/common/CatalogUtils.java       | 48 ++++++++++++
 .../service/catalog/iceberg/IcebergCatalog.java    | 39 +---------
 .../catalog/iceberg/IcebergCatalogHandler.java     | 37 +++++++++-
 4 files changed, 164 insertions(+), 45 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
index af9937b3d..287b8f6d3 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
@@ -86,7 +86,10 @@ public class CatalogFederationIntegrationTest {
   private static String federatedCatalogName;
   private static String localCatalogRoleName;
   private static String federatedCatalogRoleName;
-  private static URI storageBase;
+  private static URI localStorageBase;
+  private static URI remoteStorageBase;
+  private static URI remoteStorageExtraAllowedLocationNs1;
+  private static URI remoteStorageExtraAllowedLocationNs2;
   private static String endpoint;
 
   private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
@@ -99,7 +102,6 @@ public class CatalogFederationIntegrationTest {
 
   @TempDir static java.nio.file.Path warehouseDir;
 
-  private URI baseLocation;
   private PrincipalWithCredentials newUserCredentials;
 
   @BeforeAll
@@ -112,8 +114,15 @@ public class CatalogFederationIntegrationTest {
     String adminToken = client.obtainToken(credentials);
     managementApi = client.managementApi(adminToken);
     catalogApi = client.catalogApi(adminToken);
-    storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
     endpoint = minioAccess.s3endpoint();
+
+    localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/local_catalog");
+    remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/federated_catalog");
+    // Allow credential vending for tables located under ns1
+    remoteStorageExtraAllowedLocationNs1 =
+        minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
+    remoteStorageExtraAllowedLocationNs2 =
+        minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
   }
 
   @AfterAll
@@ -144,7 +153,6 @@ public class CatalogFederationIntegrationTest {
   }
 
   private void setupCatalogs() {
-    baseLocation = storageBase;
     newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, 
PRINCIPAL_ROLE_NAME);
 
     AwsStorageConfigInfo storageConfig =
@@ -152,10 +160,10 @@ public class CatalogFederationIntegrationTest {
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
             .setPathStyleAccess(true)
             .setEndpoint(endpoint)
-            .setAllowedLocations(List.of(baseLocation.toString()))
+            .setAllowedLocations(List.of(localStorageBase.toString()))
             .build();
 
-    CatalogProperties catalogProperties = new 
CatalogProperties(baseLocation.toString());
+    CatalogProperties catalogProperties = new 
CatalogProperties(localStorageBase.toString());
 
     localCatalogName = "test_catalog_local_" + 
UUID.randomUUID().toString().replace("-", "");
     localCatalogRoleName = "test-catalog-role_" + 
UUID.randomUUID().toString().replace("-", "");
@@ -193,13 +201,26 @@ public class CatalogFederationIntegrationTest {
             .setRemoteCatalogName(localCatalogName)
             .setAuthenticationParameters(authParams)
             .build();
+    CatalogProperties externalCatalogProperties =
+        new CatalogProperties(remoteStorageBase.toString());
+    AwsStorageConfigInfo externalStorageConfig =
+        AwsStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setPathStyleAccess(true)
+            .setEndpoint(endpoint)
+            .setAllowedLocations(
+                List.of(
+                    remoteStorageBase.toString(),
+                    remoteStorageExtraAllowedLocationNs1.toString(),
+                    remoteStorageExtraAllowedLocationNs2.toString()))
+            .build();
     ExternalCatalog externalCatalog =
         ExternalCatalog.builder()
             .setType(Catalog.TypeEnum.EXTERNAL)
             .setName(federatedCatalogName)
             .setConnectionConfigInfo(connectionConfig)
-            .setProperties(catalogProperties)
-            .setStorageConfigInfo(storageConfig)
+            .setProperties(externalCatalogProperties)
+            .setStorageConfigInfo(externalStorageConfig)
             .build();
     managementApi.createCatalog(externalCatalog);
     managementApi.createCatalogRole(federatedCatalogName, 
federatedCatalogRoleName);
@@ -244,6 +265,11 @@ public class CatalogFederationIntegrationTest {
     spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
     spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");
 
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS ns3");
+    spark.sql("CREATE TABLE IF NOT EXISTS ns3.test_table (id int, name 
string)");
+    spark.sql("INSERT INTO ns3.test_table VALUES (1, 'Apache Spark')");
+    spark.sql("INSERT INTO ns3.test_table VALUES (2, 'Apache Iceberg')");
+
     spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1.ns1a");
     spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table (id int, name 
string)");
     spark.sql("INSERT INTO ns1.ns1a.test_table VALUES (1, 'Alice')");
@@ -256,7 +282,7 @@ public class CatalogFederationIntegrationTest {
   void testFederatedCatalogBasicReadWriteOperations() {
     spark.sql("USE " + federatedCatalogName);
     List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
-    assertThat(namespaces).hasSize(2);
+    assertThat(namespaces).hasSize(3);
     List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY 
id").collectAsList();
     List<Row> refNs1Data =
         spark
@@ -428,4 +454,45 @@ public class CatalogFederationIntegrationTest {
     assertThat(localData.get(2).getInt(0)).isEqualTo(3);
     assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
   }
+
+  @Test
+  void testFederatedCatalogNotVendCredentialForTablesOutsideAllowedLocations() 
{
+    managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, 
defaultCatalogGrant);
+
+    spark.sql("USE " + federatedCatalogName);
+
+    // Case 1: Only have TABLE_READ_DATA privilege
+    TableGrant tableReadDataGrant =
+        TableGrant.builder()
+            .setType(GrantResource.TypeEnum.TABLE)
+            .setPrivilege(TablePrivilege.TABLE_READ_DATA)
+            .setNamespace(List.of("ns3"))
+            .setTableName("test_table")
+            .build();
+    managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, 
tableReadDataGrant);
+
+    // Verify that credential vending is blocked for table under ns3, even 
with enough privilege
+    assertThatThrownBy(() -> spark.sql("SELECT * FROM ns3.test_table ORDER BY 
id").collectAsList())
+        .isInstanceOf(ForbiddenException.class)
+        .hasMessageContaining(
+            "Table 'ns3.test_table' in remote catalog has locations outside 
catalog's allowed locations:");
+
+    // Case 3: TABLE_WRITE_DATA
+    managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, 
tableReadDataGrant);
+    TableGrant tableWriteDataGrant =
+        TableGrant.builder()
+            .setType(GrantResource.TypeEnum.TABLE)
+            .setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
+            .setNamespace(List.of("ns3"))
+            .setTableName("test_table")
+            .build();
+    managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, 
tableWriteDataGrant);
+
+    // Verify that credential vending is blocked for table under ns3, even 
with enough privilege
+    assertThatThrownBy(
+            () -> spark.sql("INSERT INTO ns3.test_table VALUES (3, 
'Charlie')").collectAsList())
+        .isInstanceOf(ForbiddenException.class)
+        .hasMessageContaining(
+            "Table 'ns3.test_table' in remote catalog has locations outside 
catalog's allowed locations:");
+  }
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
index 6c0d8d82a..ce82f36e3 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
@@ -19,11 +19,18 @@
 
 package org.apache.polaris.service.catalog.common;
 
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
 import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 
 /** Utility methods for working with Polaris catalog entities. */
 public class CatalogUtils {
@@ -45,4 +52,45 @@ public class CatalogUtils {
     }
     return resolvedEntityView.getResolvedPath(tableIdentifier.namespace());
   }
+
+  /**
+   * Validates that the specified {@code locations} are valid for whatever 
storage config is found
+   * for the given entity's parent hierarchy.
+   *
+   * @param realmConfig the realm configuration
+   * @param identifier the table identifier (for error messages)
+   * @param locations the set of locations to validate (base location + 
write.data.path +
+   *     write.metadata.path)
+   * @param resolvedStorageEntity the resolved path wrapper containing storage 
configuration
+   * @throws ForbiddenException if any location is outside the allowed 
locations or if file
+   *     locations are not allowed
+   */
+  public static void validateLocationsForTableLike(
+      RealmConfig realmConfig,
+      TableIdentifier identifier,
+      Set<String> locations,
+      PolarisResolvedPathWrapper resolvedStorageEntity) {
+
+    PolarisStorageConfigurationInfo.forEntityPath(
+            realmConfig, resolvedStorageEntity.getRawFullPath())
+        .ifPresentOrElse(
+            restrictions -> restrictions.validate(realmConfig, identifier, 
locations),
+            () -> {
+              List<String> allowedStorageTypes =
+                  realmConfig.getConfig("SUPPORTED_CATALOG_STORAGE_TYPES");
+              if (allowedStorageTypes != null
+                  && 
!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
+                List<String> invalidLocations =
+                    locations.stream()
+                        .filter(
+                            location -> location.startsWith("file:") || 
location.startsWith("http"))
+                        .collect(Collectors.toList());
+                if (!invalidLocations.isEmpty()) {
+                  throw new ForbiddenException(
+                      "Invalid locations '%s' for identifier '%s': File 
locations are not allowed",
+                      invalidLocations, identifier);
+                }
+              }
+            });
+  }
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 10ac47b43..c3e248b52 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -90,7 +90,6 @@ import org.apache.iceberg.view.ViewProperties;
 import org.apache.iceberg.view.ViewUtil;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
-import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.config.BehaviorChangeConfiguration;
 import org.apache.polaris.core.config.FeatureConfiguration;
@@ -122,7 +121,6 @@ import 
org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
 import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.core.storage.PolarisStorageActions;
-import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.core.storage.StorageUtil;
 import org.apache.polaris.service.catalog.SupportsNotifications;
@@ -959,38 +957,8 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       TableIdentifier identifier,
       String location,
       PolarisResolvedPathWrapper resolvedStorageEntity) {
-    validateLocationsForTableLike(identifier, Set.of(location), 
resolvedStorageEntity);
-  }
-
-  /**
-   * Validates that the specified {@code locations} are valid for whatever 
storage config is found
-   * for this TableLike's parent hierarchy.
-   */
-  private void validateLocationsForTableLike(
-      TableIdentifier identifier,
-      Set<String> locations,
-      PolarisResolvedPathWrapper resolvedStorageEntity) {
-
-    PolarisStorageConfigurationInfo.forEntityPath(
-            realmConfig, resolvedStorageEntity.getRawFullPath())
-        .ifPresentOrElse(
-            restrictions -> restrictions.validate(realmConfig, identifier, 
locations),
-            () -> {
-              List<String> allowedStorageTypes =
-                  
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
-              if 
(!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
-                List<String> invalidLocations =
-                    locations.stream()
-                        .filter(
-                            location -> location.startsWith("file:") || 
location.startsWith("http"))
-                        .collect(Collectors.toList());
-                if (!invalidLocations.isEmpty()) {
-                  throw new ForbiddenException(
-                      "Invalid locations '%s' for identifier '%s': File 
locations are not allowed",
-                      invalidLocations, identifier);
-                }
-              }
-            });
+    CatalogUtils.validateLocationsForTableLike(
+        realmConfig, identifier, Set.of(location), resolvedStorageEntity);
   }
 
   /**
@@ -1486,7 +1454,8 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         // for the storage configuration inherited under this entity's path.
         Set<String> dataLocations =
             StorageUtil.getLocationsUsedByTable(metadata.location(), 
metadata.properties());
-        validateLocationsForTableLike(tableIdentifier, dataLocations, 
resolvedStorageEntity);
+        CatalogUtils.validateLocationsForTableLike(
+            realmConfig, tableIdentifier, dataLocations, 
resolvedStorageEntity);
         // also validate that the table location doesn't overlap an existing 
table
         dataLocations.forEach(
             location ->
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 833c0ab11..07af147d7 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -810,11 +810,19 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (baseCatalog instanceof IcebergCatalog
         || realmConfig.getConfig(
             ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, 
getResolvedCatalogEntity())) {
+
+      Set<String> tableLocations = 
StorageUtil.getLocationsUsedByTable(tableMetadata);
+
+      // For non polaris' catalog, validate that table locations are within 
allowed locations
+      if (!(baseCatalog instanceof IcebergCatalog)) {
+        validateRemoteTableLocations(tableIdentifier, tableLocations, 
resolvedStoragePath);
+      }
+
       AccessConfig accessConfig =
           accessConfigProvider.getAccessConfig(
               callContext,
               tableIdentifier,
-              StorageUtil.getLocationsUsedByTable(tableMetadata),
+              tableLocations,
               actions,
               refreshCredentialsEndpoint,
               resolvedStoragePath);
@@ -842,6 +850,33 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     return responseBuilder;
   }
 
+  private void validateRemoteTableLocations(
+      TableIdentifier tableIdentifier,
+      Set<String> tableLocations,
+      PolarisResolvedPathWrapper resolvedStoragePath) {
+
+    try {
+      // Delegate to common validation logic
+      CatalogUtils.validateLocationsForTableLike(
+          realmConfig, tableIdentifier, tableLocations, resolvedStoragePath);
+
+      LOGGER
+          .atInfo()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .addKeyValue("tableLocations", tableLocations)
+          .log("Validated federated table locations");
+    } catch (ForbiddenException e) {
+      LOGGER
+          .atError()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .addKeyValue("tableLocations", tableLocations)
+          .log("Federated table locations validation failed");
+      throw new ForbiddenException(
+          "Table '%s' in remote catalog has locations outside catalog's 
allowed locations: %s",
+          tableIdentifier, e.getMessage());
+    }
+  }
+
   private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) {
     // Certain MetadataUpdates need to be explicitly transformed to achieve 
the same behavior
     // as using a local Catalog client via TableBuilder.

Reply via email to