singhpk234 commented on code in PR #2280: URL: https://github.com/apache/polaris/pull/2280#discussion_r2505113398
########## CHANGELOG.md: ########## @@ -36,6 +36,14 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti [Iceberg Metrics Reporting]: https://iceberg.apache.org/docs/latest/metrics-reporting/ +- **S3 remote request signing** has been added, allowing Polaris to work with S3-compatible object storage systems. + *Remote signing is currently experimental and not enabled by default*. In particular, RBAC checks are currently not + production-ready. One new table privilege was introduced: `TABLE_REMOTE_SIGN`. To enable remote signing: + 1. Set the system-wide property `REMOTE_SIGNING_ENABLED` or the catalog-level `polaris.request-signing.enabled` + property to `true`. + 2. Grant the `TABLE_REMOTE_SIGN` privilege to a catalog role. The role must also be granted the `TABLE_READ_DATA` + and `TABLE_WRITE_DATA` privileges. Review Comment: [doubt] is there an assertion when even TABLE_REMOTE_SIGN is granted these two privieldges would be there ? nit : ```suggestion 2. Grant the `TABLE_REMOTE_SIGN` privilege to a catalog role. The catalog role must also be granted the `TABLE_READ_DATA` and `TABLE_WRITE_DATA` privileges. ``` ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java: ########## @@ -394,6 +394,47 @@ protected void authorizeRenameTableLikeOperationOrThrow( initializeCatalog(); } + protected void authorizeRemoteSigningOrThrow( + EnumSet<PolarisAuthorizableOperation> ops, TableIdentifier identifier) { + + ensureResolutionManifestForTable(identifier); + PolarisResolvedPathWrapper target = + resolutionManifest.getResolvedPath( + identifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE, true); + + // If the table doesn't exist, we still need to check allowed locations from the parent + // namespace. Review Comment: [doubt] is this a valid state ? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { return responseBuilder; } + private AccessDelegationMode selectAccessDelegationMode( + Set<AccessDelegationMode> delegationModes) { Review Comment: does linking this part of spec https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L1859 helps in putting some context that server is allowed to decide which one to choose ? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/io/AccessConfigProvider.java: ########## @@ -101,4 +120,58 @@ public AccessConfig getAccessConfig( storageInfo.get(), refreshCredentialsEndpoint); } + + /** + * Generates a remote signing configuration for accessing table storage at explicit locations. + * + * @param callContext the call context containing realm, principal, and security context + * @param catalogName the name of the catalog + * @param tableIdentifier the table identifier, used for logging and refresh endpoint construction + * @param resolvedPath the entity hierarchy to search for storage configuration + * @return {@link AccessConfig} with scoped credentials and metadata; empty if no storage config + * found + */ + public AccessConfig getAccessConfigForRemoteSigning( + @Nonnull CallContext callContext, + @Nonnull String catalogName, + @Nonnull TableIdentifier tableIdentifier, + @Nonnull PolarisResolvedPathWrapper resolvedPath) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Fetching remote signing config for table"); + + Optional<PolarisEntity> storageInfo = FileIOUtil.findStorageInfoFromHierarchy(resolvedPath); + Optional<PolarisStorageConfigurationInfo> configurationInfo = + storageInfo + .map(PolarisEntity::getInternalPropertiesAsMap) + .map(info -> info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); + + if (configurationInfo.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Table entity has no storage configuration in its hierarchy"); + return AccessConfig.EMPTY; + } + + PolarisStorageIntegration<AwsStorageConfigurationInfo> storageIntegration = + storageIntegrationProvider.getStorageIntegrationForConfig(configurationInfo.get()); + + if (!(storageIntegration + instanceof AwsCredentialsStorageIntegration awsCredentialsStorageIntegration)) { + LOGGER + .atWarn() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Table entity storage integration is not an AWS credentials storage integration"); + return AccessConfig.EMPTY; + } + + String prefix = prefixParser.catalogNameToPrefix(callContext.getRealmContext(), catalogName); + URI signerUri = uriInfo.getBaseUriBuilder().path(PolarisResourcePaths.API_PATH_SEGMENT).build(); Review Comment: can we add a TODO to handle proxies here ? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { return responseBuilder; } + private AccessDelegationMode selectAccessDelegationMode( + Set<AccessDelegationMode> delegationModes) { + + if (delegationModes.isEmpty()) { + return UNKNOWN; + } + + if (delegationModes.size() == 1) { + return delegationModes.iterator().next(); + } + + if (delegationModes.contains(VENDED_CREDENTIALS) && delegationModes.contains(REMOTE_SIGNING)) { + + boolean skipCredIndirection = + realmConfig.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION); + + boolean credentialSubscopingAllowed = + baseCatalog instanceof IcebergCatalog Review Comment: If its an Iceberg catalog but doesn't support sts wouldn't we prefer remote-signing then ? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/io/AccessConfigProvider.java: ########## @@ -101,4 +120,58 @@ public AccessConfig getAccessConfig( storageInfo.get(), refreshCredentialsEndpoint); } + + /** + * Generates a remote signing configuration for accessing table storage at explicit locations. + * + * @param callContext the call context containing realm, principal, and security context + * @param catalogName the name of the catalog + * @param tableIdentifier the table identifier, used for logging and refresh endpoint construction + * @param resolvedPath the entity hierarchy to search for storage configuration + * @return {@link AccessConfig} with scoped credentials and metadata; empty if no storage config + * found + */ + public AccessConfig getAccessConfigForRemoteSigning( + @Nonnull CallContext callContext, + @Nonnull String catalogName, + @Nonnull TableIdentifier tableIdentifier, + @Nonnull PolarisResolvedPathWrapper resolvedPath) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Fetching remote signing config for table"); + + Optional<PolarisEntity> storageInfo = FileIOUtil.findStorageInfoFromHierarchy(resolvedPath); + Optional<PolarisStorageConfigurationInfo> configurationInfo = + storageInfo + .map(PolarisEntity::getInternalPropertiesAsMap) + .map(info -> info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); + + if (configurationInfo.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Table entity has no storage configuration in its hierarchy"); + return AccessConfig.EMPTY; + } + + PolarisStorageIntegration<AwsStorageConfigurationInfo> storageIntegration = + storageIntegrationProvider.getStorageIntegrationForConfig(configurationInfo.get()); + + if (!(storageIntegration + instanceof AwsCredentialsStorageIntegration awsCredentialsStorageIntegration)) { + LOGGER + .atWarn() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Table entity storage integration is not an AWS credentials storage integration"); + return AccessConfig.EMPTY; + } + + String prefix = prefixParser.catalogNameToPrefix(callContext.getRealmContext(), catalogName); + URI signerUri = uriInfo.getBaseUriBuilder().path(PolarisResourcePaths.API_PATH_SEGMENT).build(); Review Comment: can we add a TODO in code to handle proxies ? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { return responseBuilder; } + private AccessDelegationMode selectAccessDelegationMode( + Set<AccessDelegationMode> delegationModes) { + + if (delegationModes.isEmpty()) { + return UNKNOWN; + } + + if (delegationModes.size() == 1) { + return delegationModes.iterator().next(); + } + + if (delegationModes.contains(VENDED_CREDENTIALS) && delegationModes.contains(REMOTE_SIGNING)) { + + boolean skipCredIndirection = + realmConfig.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION); + + boolean credentialSubscopingAllowed = + baseCatalog instanceof IcebergCatalog Review Comment: what if its an IcebergCatalog but doesn't support STS ? ########## runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.storage.s3.sign; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.service.catalog.common.CatalogHandler; +import org.apache.polaris.service.catalog.common.CatalogUtils; +import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3RemoteSigningCatalogHandler extends CatalogHandler implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class); + + private final CallContextCatalogFactory catalogFactory; + private final S3RequestSigner s3RequestSigner; + + private CatalogEntity catalogEntity; + private Catalog baseCatalog; + + public S3RemoteSigningCatalogHandler( + PolarisDiagnostics diagnostics, + CallContext callContext, + ResolutionManifestFactory resolutionManifestFactory, + CallContextCatalogFactory catalogFactory, + PolarisPrincipal polarisPrincipal, + String catalogName, + PolarisAuthorizer authorizer, + S3RequestSigner s3RequestSigner) { + super( + diagnostics, + callContext, + resolutionManifestFactory, + polarisPrincipal, + catalogName, + authorizer, + // external catalogs are not supported for S3 remote signing + null, + null); + this.catalogFactory = catalogFactory; + this.s3RequestSigner = s3RequestSigner; + } + + @Override + protected void initializeCatalog() { + catalogEntity = + CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Cannot use S3 remote signing with federated catalogs."); + } + baseCatalog = + catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, resolutionManifest); + } + + public PolarisS3SignResponse signS3Request( + PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) { + + LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, s3SignRequest); + + PolarisAuthorizableOperation authzOp = + s3SignRequest.write() + ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST + : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST; + + authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier); + + // Must be done after the authorization check, as the auth check creates the catalog entity + throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), catalogEntity); + + var result = + InMemoryStorageIntegration.validateAllowedLocations( + callContext.getRealmConfig(), + getAllowedLocations(tableIdentifier), + getStorageActions(s3SignRequest), + getTargetLocations(s3SignRequest)); + + if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v -> !v.isSuccess()))) { + throw new ForbiddenException("Requested S3 location is not allowed."); + } + + PolarisS3SignResponse s3SignResponse = s3RequestSigner.signRequest(s3SignRequest); + LOGGER.debug("S3 signing response: {}", s3SignResponse); + + return s3SignResponse; + } + + private Collection<String> getAllowedLocations(TableIdentifier tableIdentifier) { + + if (baseCatalog.tableExists(tableIdentifier)) { + + // If the table exists, get allowed locations from the table metadata + Table table = baseCatalog.loadTable(tableIdentifier); + if (table instanceof BaseTable baseTable) { + return StorageUtil.getLocationsUsedByTable(baseTable.operations().current()); + } + + throw new ForbiddenException("No storage configuration found for table."); + + } else { + + // If the table or view doesn't exist, the engine might be writing the manifests before the + // table creation is committed. In this case, we still need to check allowed locations from + // the parent entities. Review Comment: is this the case of staged table ? i wondering if we should block on this case ? becasue it may happen the table is different from namespace location ? ########## runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.storage.s3.sign; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.service.catalog.common.CatalogHandler; +import org.apache.polaris.service.catalog.common.CatalogUtils; +import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3RemoteSigningCatalogHandler extends CatalogHandler implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class); + + private final CallContextCatalogFactory catalogFactory; + private final S3RequestSigner s3RequestSigner; + + private CatalogEntity catalogEntity; + private Catalog baseCatalog; + + public S3RemoteSigningCatalogHandler( + PolarisDiagnostics diagnostics, + CallContext callContext, + ResolutionManifestFactory resolutionManifestFactory, + CallContextCatalogFactory catalogFactory, + PolarisPrincipal polarisPrincipal, + String catalogName, + PolarisAuthorizer authorizer, + S3RequestSigner s3RequestSigner) { + super( + diagnostics, + callContext, + resolutionManifestFactory, + polarisPrincipal, + catalogName, + authorizer, + // external catalogs are not supported for S3 remote signing + null, + null); + this.catalogFactory = catalogFactory; + this.s3RequestSigner = s3RequestSigner; + } + + @Override + protected void initializeCatalog() { + catalogEntity = + CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Cannot use S3 remote signing with federated catalogs."); + } + baseCatalog = + catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, resolutionManifest); + } + + public PolarisS3SignResponse signS3Request( + PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) { + + LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, s3SignRequest); + + PolarisAuthorizableOperation authzOp = + s3SignRequest.write() + ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST + : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST; + + authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier); + + // Must be done after the authorization check, as the auth check creates the catalog entity + throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), catalogEntity); + + var result = + InMemoryStorageIntegration.validateAllowedLocations( + callContext.getRealmConfig(), + getAllowedLocations(tableIdentifier), + getStorageActions(s3SignRequest), + getTargetLocations(s3SignRequest)); + + if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v -> !v.isSuccess()))) { + throw new ForbiddenException("Requested S3 location is not allowed."); + } + + PolarisS3SignResponse s3SignResponse = s3RequestSigner.signRequest(s3SignRequest); + LOGGER.debug("S3 signing response: {}", s3SignResponse); + + return s3SignResponse; + } + + private Collection<String> getAllowedLocations(TableIdentifier tableIdentifier) { + + if (baseCatalog.tableExists(tableIdentifier)) { + + // If the table exists, get allowed locations from the table metadata + Table table = baseCatalog.loadTable(tableIdentifier); + if (table instanceof BaseTable baseTable) { + return StorageUtil.getLocationsUsedByTable(baseTable.operations().current()); + } + + throw new ForbiddenException("No storage configuration found for table."); + + } else { + + // If the table or view doesn't exist, the engine might be writing the manifests before the + // table creation is committed. In this case, we still need to check allowed locations from + // the parent entities. + + PolarisResolvedPathWrapper resolvedPath = + CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); + + Optional<PolarisEntity> storageInfo = FileIOUtil.findStorageInfoFromHierarchy(resolvedPath); + + var configurationInfo = + storageInfo + .map(PolarisEntity::getInternalPropertiesAsMap) + .map(info -> info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); + + if (configurationInfo.isEmpty()) { + throw new ForbiddenException("No storage configuration found for table."); + } + + return configurationInfo.get().getAllowedLocations(); + } + } + + private Set<PolarisStorageActions> getStorageActions(PolarisS3SignRequest s3SignRequest) { + // TODO M2: better handling of DELETE and LIST + return s3SignRequest.write() + ? Set.of(PolarisStorageActions.WRITE) + : Set.of(PolarisStorageActions.READ); + } + + private Set<String> getTargetLocations(PolarisS3SignRequest s3SignRequest) { + // TODO M2: map http URI to s3 URI + return Set.of(); + } + + public static void throwIfRemoteSigningNotEnabled( + RealmConfig realmConfig, CatalogEntity catalogEntity) { + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Remote signing is not enabled for external catalogs."); + } Review Comment: I understand we discussed this before for federated catalog we should disable remote signing but during that time we didn;t support cred vending for them either, i wonder if its a good time to reconsider that remote signing would be possible too (ofc in a new PR) ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { return responseBuilder; } + private AccessDelegationMode selectAccessDelegationMode( Review Comment: would it be helpful to add a comment here and link irc spec here to pick the delegation mode they prefer ? ########## runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.storage.s3.sign; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.service.catalog.common.CatalogHandler; +import org.apache.polaris.service.catalog.common.CatalogUtils; +import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3RemoteSigningCatalogHandler extends CatalogHandler implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class); + + private final CallContextCatalogFactory catalogFactory; + private final S3RequestSigner s3RequestSigner; + + private CatalogEntity catalogEntity; + private Catalog baseCatalog; + + public S3RemoteSigningCatalogHandler( + PolarisDiagnostics diagnostics, + CallContext callContext, + ResolutionManifestFactory resolutionManifestFactory, + CallContextCatalogFactory catalogFactory, + PolarisPrincipal polarisPrincipal, + String catalogName, + PolarisAuthorizer authorizer, + S3RequestSigner s3RequestSigner) { + super( + diagnostics, + callContext, + resolutionManifestFactory, + polarisPrincipal, + catalogName, + authorizer, + // external catalogs are not supported for S3 remote signing + null, + null); + this.catalogFactory = catalogFactory; + this.s3RequestSigner = s3RequestSigner; + } + + @Override + protected void initializeCatalog() { + catalogEntity = + CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Cannot use S3 remote signing with federated catalogs."); + } + baseCatalog = + catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, resolutionManifest); + } + + public PolarisS3SignResponse signS3Request( + PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) { + + LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, s3SignRequest); + + PolarisAuthorizableOperation authzOp = + s3SignRequest.write() + ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST + : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST; + + authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier); + + // Must be done after the authorization check, as the auth check creates the catalog entity + throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), catalogEntity); + + var result = + InMemoryStorageIntegration.validateAllowedLocations( + callContext.getRealmConfig(), + getAllowedLocations(tableIdentifier), + getStorageActions(s3SignRequest), + getTargetLocations(s3SignRequest)); + + if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v -> !v.isSuccess()))) { + throw new ForbiddenException("Requested S3 location is not allowed."); + } + + PolarisS3SignResponse s3SignResponse = s3RequestSigner.signRequest(s3SignRequest); + LOGGER.debug("S3 signing response: {}", s3SignResponse); + + return s3SignResponse; + } + + private Collection<String> getAllowedLocations(TableIdentifier tableIdentifier) { + + if (baseCatalog.tableExists(tableIdentifier)) { + + // If the table exists, get allowed locations from the table metadata + Table table = baseCatalog.loadTable(tableIdentifier); Review Comment: for each SIGN request a loadTable call is super expensive, if someone enables this in prod by accident for a table million of files the service might become unresponsive ? [suggestion / optional to address] wondering for now if we should add a prod readiness check to keep this disabled for prod ? ########## runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.storage.s3.sign; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.service.catalog.common.CatalogHandler; +import org.apache.polaris.service.catalog.common.CatalogUtils; +import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3RemoteSigningCatalogHandler extends CatalogHandler implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class); + + private final CallContextCatalogFactory catalogFactory; + private final S3RequestSigner s3RequestSigner; + + private CatalogEntity catalogEntity; + private Catalog baseCatalog; + + public S3RemoteSigningCatalogHandler( + PolarisDiagnostics diagnostics, + CallContext callContext, + ResolutionManifestFactory resolutionManifestFactory, + CallContextCatalogFactory catalogFactory, + PolarisPrincipal polarisPrincipal, + String catalogName, + PolarisAuthorizer authorizer, + S3RequestSigner s3RequestSigner) { + super( + diagnostics, + callContext, + resolutionManifestFactory, + polarisPrincipal, + catalogName, + authorizer, + // external catalogs are not supported for S3 remote signing + null, + null); + this.catalogFactory = catalogFactory; + this.s3RequestSigner = s3RequestSigner; + } + + @Override + protected void initializeCatalog() { + catalogEntity = + CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Cannot use S3 remote signing with federated catalogs."); + } + baseCatalog = + catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, resolutionManifest); + } + + public PolarisS3SignResponse signS3Request( + PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) { + + LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, s3SignRequest); Review Comment: do we need to explicity log it here or quarkus supports Access log, asking because i don't see such logging in Iceberg Catalog Handler ########## runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.storage.s3.sign; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisAuthorizableOperation; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.service.catalog.common.CatalogHandler; +import org.apache.polaris.service.catalog.common.CatalogUtils; +import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest; +import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3RemoteSigningCatalogHandler extends CatalogHandler implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class); + + private final CallContextCatalogFactory catalogFactory; + private final S3RequestSigner s3RequestSigner; + + private CatalogEntity catalogEntity; + private Catalog baseCatalog; + + public S3RemoteSigningCatalogHandler( + PolarisDiagnostics diagnostics, + CallContext callContext, + ResolutionManifestFactory resolutionManifestFactory, + CallContextCatalogFactory catalogFactory, + PolarisPrincipal polarisPrincipal, + String catalogName, + PolarisAuthorizer authorizer, + S3RequestSigner s3RequestSigner) { + super( + diagnostics, + callContext, + resolutionManifestFactory, + polarisPrincipal, + catalogName, + authorizer, + // external catalogs are not supported for S3 remote signing + null, + null); + this.catalogFactory = catalogFactory; + this.s3RequestSigner = s3RequestSigner; + } + + @Override + protected void initializeCatalog() { + catalogEntity = + CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity()); + if (catalogEntity.isExternal()) { + throw new ForbiddenException("Cannot use S3 remote signing with federated catalogs."); + } + baseCatalog = + catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, resolutionManifest); + } + + public PolarisS3SignResponse signS3Request( + PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) { + + LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, s3SignRequest); + + PolarisAuthorizableOperation authzOp = + s3SignRequest.write() + ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST + : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST; + + authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier); + + // Must be done after the authorization check, as the auth check creates the catalog entity + throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), catalogEntity); + + var result = + InMemoryStorageIntegration.validateAllowedLocations( + callContext.getRealmConfig(), + getAllowedLocations(tableIdentifier), + getStorageActions(s3SignRequest), + getTargetLocations(s3SignRequest)); + + if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v -> !v.isSuccess()))) { + throw new ForbiddenException("Requested S3 location is not allowed."); + } + + PolarisS3SignResponse s3SignResponse = s3RequestSigner.signRequest(s3SignRequest); + LOGGER.debug("S3 signing response: {}", s3SignResponse); + + return s3SignResponse; + } + + private Collection<String> getAllowedLocations(TableIdentifier tableIdentifier) { + + if (baseCatalog.tableExists(tableIdentifier)) { + + // If the table exists, get allowed locations from the table metadata + Table table = baseCatalog.loadTable(tableIdentifier); + if (table instanceof BaseTable baseTable) { + return StorageUtil.getLocationsUsedByTable(baseTable.operations().current()); + } + + throw new ForbiddenException("No storage configuration found for table."); + + } else { + + // If the table or view doesn't exist, the engine might be writing the manifests before the + // table creation is committed. In this case, we still need to check allowed locations from + // the parent entities. + + PolarisResolvedPathWrapper resolvedPath = + CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); + + Optional<PolarisEntity> storageInfo = FileIOUtil.findStorageInfoFromHierarchy(resolvedPath); + + var configurationInfo = + storageInfo + .map(PolarisEntity::getInternalPropertiesAsMap) + .map(info -> info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); + + if (configurationInfo.isEmpty()) { + throw new ForbiddenException("No storage configuration found for table."); + } + + return configurationInfo.get().getAllowedLocations(); + } + } + + private Set<PolarisStorageActions> getStorageActions(PolarisS3SignRequest s3SignRequest) { + // TODO M2: better handling of DELETE and LIST Review Comment: [for my understanding] can you please elaborate this case more ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
