This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new ed029d1cd [Catalog Federation] Block credential vending for remote
tables outside allowed location list (#2791)
ed029d1cd is described below
commit ed029d1cd1e3b6856530fcc95edc5d7ec2781eb2
Author: Honah (Jonas) J. <[email protected]>
AuthorDate: Mon Oct 13 13:07:28 2025 -0500
[Catalog Federation] Block credential vending for remote tables outside
allowed location list (#2791)
---
.../it/test/CatalogFederationIntegrationTest.java | 85 +++++++++++++++++++---
.../service/catalog/common/CatalogUtils.java | 48 ++++++++++++
.../service/catalog/iceberg/IcebergCatalog.java | 39 +---------
.../catalog/iceberg/IcebergCatalogHandler.java | 37 +++++++++-
4 files changed, 164 insertions(+), 45 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
index af9937b3d..287b8f6d3 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
@@ -86,7 +86,10 @@ public class CatalogFederationIntegrationTest {
private static String federatedCatalogName;
private static String localCatalogRoleName;
private static String federatedCatalogRoleName;
- private static URI storageBase;
+ private static URI localStorageBase;
+ private static URI remoteStorageBase;
+ private static URI remoteStorageExtraAllowedLocationNs1;
+ private static URI remoteStorageExtraAllowedLocationNs2;
private static String endpoint;
private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
@@ -99,7 +102,6 @@ public class CatalogFederationIntegrationTest {
@TempDir static java.nio.file.Path warehouseDir;
- private URI baseLocation;
private PrincipalWithCredentials newUserCredentials;
@BeforeAll
@@ -112,8 +114,15 @@ public class CatalogFederationIntegrationTest {
String adminToken = client.obtainToken(credentials);
managementApi = client.managementApi(adminToken);
catalogApi = client.catalogApi(adminToken);
- storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
endpoint = minioAccess.s3endpoint();
+
+ localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/local_catalog");
+ remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/federated_catalog");
+ // Allow credential vending for tables located under ns1
+ remoteStorageExtraAllowedLocationNs1 =
+ minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
+ remoteStorageExtraAllowedLocationNs2 =
+ minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
}
@AfterAll
@@ -144,7 +153,6 @@ public class CatalogFederationIntegrationTest {
}
private void setupCatalogs() {
- baseLocation = storageBase;
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME,
PRINCIPAL_ROLE_NAME);
AwsStorageConfigInfo storageConfig =
@@ -152,10 +160,10 @@ public class CatalogFederationIntegrationTest {
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setPathStyleAccess(true)
.setEndpoint(endpoint)
- .setAllowedLocations(List.of(baseLocation.toString()))
+ .setAllowedLocations(List.of(localStorageBase.toString()))
.build();
- CatalogProperties catalogProperties = new
CatalogProperties(baseLocation.toString());
+ CatalogProperties catalogProperties = new
CatalogProperties(localStorageBase.toString());
localCatalogName = "test_catalog_local_" +
UUID.randomUUID().toString().replace("-", "");
localCatalogRoleName = "test-catalog-role_" +
UUID.randomUUID().toString().replace("-", "");
@@ -193,13 +201,26 @@ public class CatalogFederationIntegrationTest {
.setRemoteCatalogName(localCatalogName)
.setAuthenticationParameters(authParams)
.build();
+ CatalogProperties externalCatalogProperties =
+ new CatalogProperties(remoteStorageBase.toString());
+ AwsStorageConfigInfo externalStorageConfig =
+ AwsStorageConfigInfo.builder()
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setPathStyleAccess(true)
+ .setEndpoint(endpoint)
+ .setAllowedLocations(
+ List.of(
+ remoteStorageBase.toString(),
+ remoteStorageExtraAllowedLocationNs1.toString(),
+ remoteStorageExtraAllowedLocationNs2.toString()))
+ .build();
ExternalCatalog externalCatalog =
ExternalCatalog.builder()
.setType(Catalog.TypeEnum.EXTERNAL)
.setName(federatedCatalogName)
.setConnectionConfigInfo(connectionConfig)
- .setProperties(catalogProperties)
- .setStorageConfigInfo(storageConfig)
+ .setProperties(externalCatalogProperties)
+ .setStorageConfigInfo(externalStorageConfig)
.build();
managementApi.createCatalog(externalCatalog);
managementApi.createCatalogRole(federatedCatalogName,
federatedCatalogRoleName);
@@ -244,6 +265,11 @@ public class CatalogFederationIntegrationTest {
spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS ns3");
+ spark.sql("CREATE TABLE IF NOT EXISTS ns3.test_table (id int, name
string)");
+ spark.sql("INSERT INTO ns3.test_table VALUES (1, 'Apache Spark')");
+ spark.sql("INSERT INTO ns3.test_table VALUES (2, 'Apache Iceberg')");
+
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1.ns1a");
spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table (id int, name
string)");
spark.sql("INSERT INTO ns1.ns1a.test_table VALUES (1, 'Alice')");
@@ -256,7 +282,7 @@ public class CatalogFederationIntegrationTest {
void testFederatedCatalogBasicReadWriteOperations() {
spark.sql("USE " + federatedCatalogName);
List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
- assertThat(namespaces).hasSize(2);
+ assertThat(namespaces).hasSize(3);
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY
id").collectAsList();
List<Row> refNs1Data =
spark
@@ -428,4 +454,45 @@ public class CatalogFederationIntegrationTest {
assertThat(localData.get(2).getInt(0)).isEqualTo(3);
assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
}
+
+ @Test
+ void testFederatedCatalogNotVendCredentialForTablesOutsideAllowedLocations()
{
+ managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName,
defaultCatalogGrant);
+
+ spark.sql("USE " + federatedCatalogName);
+
+ // Case 1: Only have TABLE_READ_DATA privilege
+ TableGrant tableReadDataGrant =
+ TableGrant.builder()
+ .setType(GrantResource.TypeEnum.TABLE)
+ .setPrivilege(TablePrivilege.TABLE_READ_DATA)
+ .setNamespace(List.of("ns3"))
+ .setTableName("test_table")
+ .build();
+ managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName,
tableReadDataGrant);
+
+ // Verify that credential vending is blocked for table under ns3, even
with enough privilege
+ assertThatThrownBy(() -> spark.sql("SELECT * FROM ns3.test_table ORDER BY
id").collectAsList())
+ .isInstanceOf(ForbiddenException.class)
+ .hasMessageContaining(
+ "Table 'ns3.test_table' in remote catalog has locations outside
catalog's allowed locations:");
+
+ // Case 3: TABLE_WRITE_DATA
+ managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName,
tableReadDataGrant);
+ TableGrant tableWriteDataGrant =
+ TableGrant.builder()
+ .setType(GrantResource.TypeEnum.TABLE)
+ .setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
+ .setNamespace(List.of("ns3"))
+ .setTableName("test_table")
+ .build();
+ managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName,
tableWriteDataGrant);
+
+ // Verify that credential vending is blocked for table under ns3, even
with enough privilege
+ assertThatThrownBy(
+ () -> spark.sql("INSERT INTO ns3.test_table VALUES (3,
'Charlie')").collectAsList())
+ .isInstanceOf(ForbiddenException.class)
+ .hasMessageContaining(
+ "Table 'ns3.test_table' in remote catalog has locations outside
catalog's allowed locations:");
+ }
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
index 6c0d8d82a..ce82f36e3 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java
@@ -19,11 +19,18 @@
package org.apache.polaris.service.catalog.common;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
/** Utility methods for working with Polaris catalog entities. */
public class CatalogUtils {
@@ -45,4 +52,45 @@ public class CatalogUtils {
}
return resolvedEntityView.getResolvedPath(tableIdentifier.namespace());
}
+
+ /**
+ * Validates that the specified {@code locations} are valid for whatever
storage config is found
+ * for the given entity's parent hierarchy.
+ *
+ * @param realmConfig the realm configuration
+ * @param identifier the table identifier (for error messages)
+ * @param locations the set of locations to validate (base location +
write.data.path +
+ * write.metadata.path)
+ * @param resolvedStorageEntity the resolved path wrapper containing storage
configuration
+ * @throws ForbiddenException if any location is outside the allowed
locations or if file
+ * locations are not allowed
+ */
+ public static void validateLocationsForTableLike(
+ RealmConfig realmConfig,
+ TableIdentifier identifier,
+ Set<String> locations,
+ PolarisResolvedPathWrapper resolvedStorageEntity) {
+
+ PolarisStorageConfigurationInfo.forEntityPath(
+ realmConfig, resolvedStorageEntity.getRawFullPath())
+ .ifPresentOrElse(
+ restrictions -> restrictions.validate(realmConfig, identifier,
locations),
+ () -> {
+ List<String> allowedStorageTypes =
+ realmConfig.getConfig("SUPPORTED_CATALOG_STORAGE_TYPES");
+ if (allowedStorageTypes != null
+ &&
!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
+ List<String> invalidLocations =
+ locations.stream()
+ .filter(
+ location -> location.startsWith("file:") ||
location.startsWith("http"))
+ .collect(Collectors.toList());
+ if (!invalidLocations.isEmpty()) {
+ throw new ForbiddenException(
+ "Invalid locations '%s' for identifier '%s': File
locations are not allowed",
+ invalidLocations, identifier);
+ }
+ }
+ });
+ }
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 10ac47b43..c3e248b52 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -90,7 +90,6 @@ import org.apache.iceberg.view.ViewProperties;
import org.apache.iceberg.view.ViewUtil;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
-import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -122,7 +121,6 @@ import
org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.storage.PolarisStorageActions;
-import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.service.catalog.SupportsNotifications;
@@ -959,38 +957,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
TableIdentifier identifier,
String location,
PolarisResolvedPathWrapper resolvedStorageEntity) {
- validateLocationsForTableLike(identifier, Set.of(location),
resolvedStorageEntity);
- }
-
- /**
- * Validates that the specified {@code locations} are valid for whatever
storage config is found
- * for this TableLike's parent hierarchy.
- */
- private void validateLocationsForTableLike(
- TableIdentifier identifier,
- Set<String> locations,
- PolarisResolvedPathWrapper resolvedStorageEntity) {
-
- PolarisStorageConfigurationInfo.forEntityPath(
- realmConfig, resolvedStorageEntity.getRawFullPath())
- .ifPresentOrElse(
- restrictions -> restrictions.validate(realmConfig, identifier,
locations),
- () -> {
- List<String> allowedStorageTypes =
-
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
- if
(!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
- List<String> invalidLocations =
- locations.stream()
- .filter(
- location -> location.startsWith("file:") ||
location.startsWith("http"))
- .collect(Collectors.toList());
- if (!invalidLocations.isEmpty()) {
- throw new ForbiddenException(
- "Invalid locations '%s' for identifier '%s': File
locations are not allowed",
- invalidLocations, identifier);
- }
- }
- });
+ CatalogUtils.validateLocationsForTableLike(
+ realmConfig, identifier, Set.of(location), resolvedStorageEntity);
}
/**
@@ -1486,7 +1454,8 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
// for the storage configuration inherited under this entity's path.
Set<String> dataLocations =
StorageUtil.getLocationsUsedByTable(metadata.location(),
metadata.properties());
- validateLocationsForTableLike(tableIdentifier, dataLocations,
resolvedStorageEntity);
+ CatalogUtils.validateLocationsForTableLike(
+ realmConfig, tableIdentifier, dataLocations,
resolvedStorageEntity);
// also validate that the table location doesn't overlap an existing
table
dataLocations.forEach(
location ->
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index 833c0ab11..07af147d7 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -810,11 +810,19 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
if (baseCatalog instanceof IcebergCatalog
|| realmConfig.getConfig(
ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING,
getResolvedCatalogEntity())) {
+
+ Set<String> tableLocations =
StorageUtil.getLocationsUsedByTable(tableMetadata);
+
+ // For non polaris' catalog, validate that table locations are within
allowed locations
+ if (!(baseCatalog instanceof IcebergCatalog)) {
+ validateRemoteTableLocations(tableIdentifier, tableLocations,
resolvedStoragePath);
+ }
+
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
tableIdentifier,
- StorageUtil.getLocationsUsedByTable(tableMetadata),
+ tableLocations,
actions,
refreshCredentialsEndpoint,
resolvedStoragePath);
@@ -842,6 +850,33 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
return responseBuilder;
}
+ private void validateRemoteTableLocations(
+ TableIdentifier tableIdentifier,
+ Set<String> tableLocations,
+ PolarisResolvedPathWrapper resolvedStoragePath) {
+
+ try {
+ // Delegate to common validation logic
+ CatalogUtils.validateLocationsForTableLike(
+ realmConfig, tableIdentifier, tableLocations, resolvedStoragePath);
+
+ LOGGER
+ .atInfo()
+ .addKeyValue("tableIdentifier", tableIdentifier)
+ .addKeyValue("tableLocations", tableLocations)
+ .log("Validated federated table locations");
+ } catch (ForbiddenException e) {
+ LOGGER
+ .atError()
+ .addKeyValue("tableIdentifier", tableIdentifier)
+ .addKeyValue("tableLocations", tableLocations)
+ .log("Federated table locations validation failed");
+ throw new ForbiddenException(
+ "Table '%s' in remote catalog has locations outside catalog's
allowed locations: %s",
+ tableIdentifier, e.getMessage());
+ }
+ }
+
private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) {
// Certain MetadataUpdates need to be explicitly transformed to achieve
the same behavior
// as using a local Catalog client via TableBuilder.