flyrain commented on code in PR #3734:
URL: https://github.com/apache/polaris/pull/3734#discussion_r3228874220


##########
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java:
##########
@@ -708,15 +708,101 @@ public void 
testCreateTableWithOverriddenBaseLocationMustResideInNsDirectory() {
         .isInstanceOf(ForbiddenException.class);
   }
 
+  /**
+   * Create an INTERNAL catalog. Register a table WITH access delegation and 
verify that the
+   * registerTable response contains vended credentials (when supported by the 
storage type).
+   */
+  @RestCatalogConfig({"header.X-Iceberg-Access-Delegation", 
"vended-credentials"})
+  @Test
+  public void testRegisterTableWithAccessDelegation() {
+    Namespace ns1 = Namespace.of("ns1");
+    restCatalog.createNamespace(ns1);
+    TableMetadata tableMetadata =
+        TableMetadata.newTableMetadata(
+            new Schema(List.of(Types.NestedField.required(1, "col1", new 
Types.StringType()))),
+            PartitionSpec.unpartitioned(),
+            catalogBaseLocation + "/ns1/my_table",
+            Map.of());
+    try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
+      initializeClientFileIO(resolvingFileIO);
+      resolvingFileIO.setConf(new Configuration());
+      String fileLocation = catalogBaseLocation + 
"/ns1/my_table/metadata/v1.metadata.json";
+      TableMetadataParser.write(tableMetadata, 
resolvingFileIO.newOutputFile(fileLocation));
+      try {
+        LoadTableResponse registerResponse =
+            catalogApi.registerTableWithAccessDelegation(
+                currentCatalogName, ns1, "my_table", fileLocation);
+
+        assertThat(registerResponse).isNotNull();
+        assertThat(registerResponse.tableMetadata()).isNotNull();
+        
assertThat(registerResponse.tableMetadata().location()).isEqualTo(tableMetadata.location());
+        
assertThat(registerResponse.metadataLocation()).isEqualTo(fileLocation);
+
+        if (getStorageConfigInfo().getStorageType() != 
StorageConfigInfo.StorageTypeEnum.FILE) {
+          assertThat(registerResponse.credentials())
+              .as("Cloud storage should vend credentials")
+              .isNotEmpty();
+        }
+      } finally {
+        resolvingFileIO.deleteFile(fileLocation);
+      }
+    }
+  }
+
+  /**
+   * Create an EXTERNAL catalog with credential vending enabled. Register a 
table WITH access
+   * delegation and verify that the registerTable response contains vended 
credentials (when
+   * supported by the storage type).
+   */
+  @CatalogConfig(
+      value = Catalog.TypeEnum.EXTERNAL,
+      properties = {"polaris.config.enable.credential.vending", "true"})
+  @RestCatalogConfig({"header.X-Iceberg-Access-Delegation", 
"vended-credentials"})
+  @Test
+  public void testRegisterTableWithAccessDelegationForExternalCatalog() {
+    Namespace ns1 = Namespace.of("ns1");
+    restCatalog.createNamespace(ns1);
+    TableMetadata tableMetadata =
+        TableMetadata.newTableMetadata(
+            new Schema(List.of(Types.NestedField.required(1, "col1", new 
Types.StringType()))),
+            PartitionSpec.unpartitioned(),
+            externalCatalogBaseLocation + "/ns1/my_table",
+            Map.of());
+    try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
+      initializeClientFileIO(resolvingFileIO);
+      resolvingFileIO.setConf(new Configuration());
+      String fileLocation = externalCatalogBaseLocation + 
"/ns1/my_table/metadata/v1.metadata.json";
+      TableMetadataParser.write(tableMetadata, 
resolvingFileIO.newOutputFile(fileLocation));
+      try {
+        LoadTableResponse registerResponse =
+            catalogApi.registerTableWithAccessDelegation(
+                currentCatalogName, ns1, "my_table", fileLocation);
+
+        assertThat(registerResponse).isNotNull();
+        assertThat(registerResponse.tableMetadata()).isNotNull();
+        
assertThat(registerResponse.tableMetadata().location()).isEqualTo(tableMetadata.location());
+        
assertThat(registerResponse.metadataLocation()).isEqualTo(fileLocation);
+
+        if (getStorageConfigInfo().getStorageType() != 
StorageConfigInfo.StorageTypeEnum.FILE) {
+          assertThat(registerResponse.credentials())
+              .as("Cloud storage should vend credentials")
+              .isNotEmpty();
+        }
+      } finally {
+        resolvingFileIO.deleteFile(fileLocation);
+      }
+    }
+  }
+
   /**
    * Create an EXTERNAL catalog. The test configuration, by default, disables 
access delegation for
-   * EXTERNAL catalogs, so register a table and try to load it with the REST 
client configured to
-   * try to fetch vended credentials. Expect a ForbiddenException.
+   * EXTERNAL catalogs, so try to register a table with the REST client 
configured to try to fetch
+   * vended credentials. Expect a ForbiddenException.
    */
   @CatalogConfig(Catalog.TypeEnum.EXTERNAL)
   @RestCatalogConfig({"header.X-Iceberg-Access-Delegation", 
"vended-credentials"})
   @Test
-  public void 
testLoadTableWithAccessDelegationForExternalCatalogWithConfigDisabled() {
+  public void 
testRegisterTableWithAccessDelegationForExternalCatalogWithConfigDisabled() {

Review Comment:
   This rename means the loadTable + delegation-header + external + 
config-disabled scenario no longer has a dedicated test. 
`testLoadTableWithoutAccessDelegationForExternalCatalogWithConfigDisabled` (no 
header) doesn't cover it. Consider keeping a loadTable variant alongside the 
new register one.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -600,26 +600,50 @@ public LoadTableResponse createTableStaged(
    *
    * @param namespace The namespace to register the table in
    * @param request the register table request
+   * @param delegationModes the access delegation modes to use
+   * @param refreshCredentialsEndpoint the refresh credentials endpoint to use
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
-  public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
+  public LoadTableResponse registerTable(
+      Namespace namespace,
+      RegisterTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<String> refreshCredentialsEndpoint) {
+
+    request.validate();
     TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
     boolean overwrite = request.overwrite();
 
+    Set<PolarisStorageActions> actionsRequested =
+        authorizeRegisterTable(identifier, delegationModes, overwrite);
+
     if (overwrite) {
-      authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-          PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE, identifier);
-      return registerTableWithOverwrite(identifier, request);
+      return registerTableWithOverwrite(
+          identifier, request, delegationModes, actionsRequested, 
refreshCredentialsEndpoint);
     }
 
-    // Creating new table requires REGISTER_TABLE privilege
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
-    return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
+    Table table = baseCatalog.registerTable(identifier, 
request.metadataLocation());
+
+    if (table instanceof BaseTable baseTable) {
+      TableMetadata tableMetadata = baseTable.operations().current();
+      return buildLoadTableResponseWithDelegationCredentials(
+              identifier,
+              tableMetadata,
+              delegationModes,
+              actionsRequested,
+              refreshCredentialsEndpoint)
+          .build();
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");

Review Comment:
   This message is confusing, how about the following:
   ```
     throw new IllegalStateException(
         "Expected a BaseTable object from registerTable() for %s, got: %s"
             .formatted(identifier, table.getClass().getName()));
   ```
   
   It'd be nice to fix in this PR, but not a blocker since this PR follows the 
previous pattern.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -631,7 +655,9 @@ private LoadTableResponse registerTableWithOverwrite(
       Table table = icebergCatalog.registerTable(identifier, 
request.metadataLocation(), true);
       if (table instanceof BaseTable baseTable) {
         TableMetadata metadata = baseTable.operations().current();
-        return LoadTableResponse.builder().withTableMetadata(metadata).build();
+        return buildLoadTableResponseWithDelegationCredentials(
+                identifier, metadata, delegationModes, actionsRequested, 
refreshCredentialsEndpoint)
+            .build();
       }
       throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");

Review Comment:
   This message is confusing, how about the following:
   ```
     throw new IllegalStateException(
         "Expected a BaseTable object from registerTable() for %s, got: %s"
             .formatted(identifier, table.getClass().getName()));
   ```
   
   Not a blocker since this PR doesn't touch it.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -808,6 +834,44 @@ private Set<PolarisStorageActions> authorizeLoadTable(
     return actionsRequested;
   }
 
+  private Set<PolarisStorageActions> authorizeRegisterTable(
+      TableIdentifier tableIdentifier,
+      EnumSet<AccessDelegationMode> delegationModes,
+      boolean overwrite) {
+
+    if (delegationModes.isEmpty()) {
+      PolarisAuthorizableOperation op =
+          overwrite
+              ? PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE
+              : PolarisAuthorizableOperation.REGISTER_TABLE;
+      authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, 
tableIdentifier);
+      return Set.of();
+    }
+
+    PolarisAuthorizableOperation readOp =
+        overwrite
+            ? 
PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE_WITH_READ_DELEGATION
+            : PolarisAuthorizableOperation.REGISTER_TABLE_WITH_READ_DELEGATION;
+    PolarisAuthorizableOperation writeOp =
+        overwrite
+            ? 
PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE_WITH_WRITE_DELEGATION
+            : 
PolarisAuthorizableOperation.REGISTER_TABLE_WITH_WRITE_DELEGATION;
+
+    Set<PolarisStorageActions> actionsRequested =
+        EnumSet.of(PolarisStorageActions.READ, PolarisStorageActions.LIST);
+
+    try {
+      authorizeCreateTableLikeUnderNamespaceOperationOrThrow(writeOp, 
tableIdentifier);
+      actionsRequested.add(PolarisStorageActions.WRITE);
+    } catch (ForbiddenException e) {
+      authorizeCreateTableLikeUnderNamespaceOperationOrThrow(readOp, 
tableIdentifier);
+    }
+
+    checkAllowExternalCatalogCredentialVending(delegationModes);

Review Comment:
   A question: do we support register table in the remote catalogs?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -600,26 +600,50 @@ public LoadTableResponse createTableStaged(
    *
    * @param namespace The namespace to register the table in
    * @param request the register table request
+   * @param delegationModes the access delegation modes to use
+   * @param refreshCredentialsEndpoint the refresh credentials endpoint to use
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
-  public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
+  public LoadTableResponse registerTable(
+      Namespace namespace,
+      RegisterTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<String> refreshCredentialsEndpoint) {
+
+    request.validate();

Review Comment:
   I'd prefer to move it to adapter layer, but since createTable uses the same 
pattern, I'm OK to change it as a followup.



##########
polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java:
##########
@@ -129,6 +129,10 @@ public enum PolarisAuthorizableOperation {
   CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, 
TABLE_WRITE_DATA)),
   REGISTER_TABLE(TABLE_CREATE),
   REGISTER_TABLE_OVERWRITE(TABLE_FULL_METADATA),
+  REGISTER_TABLE_WITH_READ_DELEGATION(EnumSet.of(TABLE_CREATE, 
TABLE_READ_DATA)),
+  REGISTER_TABLE_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, 
TABLE_WRITE_DATA)),
+  
REGISTER_TABLE_OVERWRITE_WITH_READ_DELEGATION(EnumSet.of(TABLE_FULL_METADATA, 
TABLE_READ_DATA)),
+  
REGISTER_TABLE_OVERWRITE_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_FULL_METADATA, 
TABLE_WRITE_DATA)),

Review Comment:
   These looks correct to me, they are also consistent with the table creation 
operations. cc @collado-mike @dennishuo @singhpk234 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to