bharos commented on code in PR #10675:
URL: https://github.com/apache/gravitino/pull/10675#discussion_r3043315545


##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTransactionOperations.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.rest;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.google.common.annotations.VisibleForTesting;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
+import org.apache.gravitino.iceberg.service.IcebergRESTUtils;
+import 
org.apache.gravitino.iceberg.service.dispatcher.IcebergTransactionOperationDispatcher;
+import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.gravitino.metrics.MetricNames;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
+import org.apache.gravitino.server.web.Utils;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Handles {@code POST /v1/{prefix}/transactions/commit} for atomic 
multi-table commits. */
+@Path("/v1/{prefix:([^/]*/)?}transactions")
+@Consumes(MediaType.APPLICATION_JSON)
+@Produces(MediaType.APPLICATION_JSON)
+public class IcebergTransactionOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTransactionOperations.class);
+
+  @Context private HttpServletRequest httpRequest;
+
+  private final IcebergTransactionOperationDispatcher 
transactionOperationDispatcher;
+
+  @Inject
+  public IcebergTransactionOperations(
+      IcebergTransactionOperationDispatcher transactionOperationDispatcher) {
+    this.transactionOperationDispatcher = transactionOperationDispatcher;
+  }
+
+  @POST
+  @Path("commit")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Timed(name = "commit-transaction." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "commit-transaction", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER, METALAKE, CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG",
+      accessMetadataType = MetadataObject.Type.CATALOG)

Review Comment:
   This checks catalog-level permissions (`ANY_USE_CATALOG`), while the 
`updateTable` endpoint requires table-level permissions (`TABLE::OWNER || 
ANY_MODIFY_TABLE`). A user with only `USE_CATALOG` could modify tables via 
`commitTransaction` that they couldn't modify via `updateTable`. Worth 
considering per-table authorization checks within the dispatcher chain.



##########
iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java:
##########
@@ -251,6 +257,70 @@ public LoadTableResponse updateTable(IcebergTableChange 
icebergTableChange) {
     return loadTable(icebergTableChange.getTableIdentifier());
   }
 
+  /**
+   * Commits multiple table updates in a best-effort atomic manner using a 
two-phase
+   * validate-then-commit approach:
+   *
+   * <ol>
+   *   <li>Phase 1: Load all tables and validate ALL requirements against 
current metadata. If any
+   *       requirement fails, the entire transaction is rejected before any 
commit is made.
+   *   <li>Phase 2: Apply metadata updates and commit each table. Once phase 1 
succeeds, commits are
+   *       applied sequentially.
+   * </ol>
+   *
+   * <p>True cross-table atomicity (rollback of already-committed tables) 
depends on whether the
+   * underlying catalog backend supports it. Most Iceberg catalog backends do 
not provide
+   * cross-table rollback, but this two-phase approach ensures that no table 
is modified if any
+   * pre-condition check fails.
+   *
+   * <p>Each {@link UpdateTableRequest} in the request must include a {@link 
TableIdentifier}.
+   *
+   * @param commitTransactionRequest The request containing all table changes 
to apply.
+   */
+  public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest) {
+    commitTransactionRequest.validate();
+    List<UpdateTableRequest> tableChanges = 
commitTransactionRequest.tableChanges();
+
+    // Phase 1: validate ALL requirements before committing anything
+    List<org.apache.iceberg.TableOperations> allOps = new 
ArrayList<>(tableChanges.size());

Review Comment:
   Nit: `org.apache.iceberg.TableOperations` is used as a fully qualified name 
inline. Per project conventions (AGENTS.md), this should use a standard 
`import` statement — no naming conflict here.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTransactionOperations.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.rest;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.apache.gravitino.listener.api.event.IcebergCommitTransactionEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergCommitTransactionFailureEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergCommitTransactionPreEvent;
+import org.apache.gravitino.server.ServerConfig;
+import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+@SuppressWarnings("deprecation")
+public class TestIcebergTransactionOperations extends IcebergNamespaceTestBase 
{
+
+  private static final Schema initialSchema =
+      new Schema(NestedField.of(1, false, "col_a", StringType.get()));
+
+  private static final Schema updatedSchema =
+      new Schema(
+          NestedField.of(1, false, "col_a", StringType.get()),
+          NestedField.of(2, true, "col_b", StringType.get()));
+
+  private DummyEventListener dummyEventListener;
+
+  @Override
+  protected Application configure() {
+    this.dummyEventListener = new DummyEventListener();
+    ResourceConfig resourceConfig =
+        IcebergRestTestUtil.getIcebergResourceConfig(
+            MockIcebergTransactionOperations.class, true, 
Arrays.asList(dummyEventListener));
+    resourceConfig.register(MockIcebergNamespaceOperations.class);
+    resourceConfig.register(MockIcebergTableOperations.class);
+    resourceConfig.register(MockIcebergTableRenameOperations.class);
+
+    resourceConfig.register(
+        new AbstractBinder() {
+          @Override
+          protected void configure() {
+            HttpServletRequest mockRequest = 
Mockito.mock(HttpServletRequest.class);
+            Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> 
"test-user");
+            bind(mockRequest).to(HttpServletRequest.class);
+          }
+        });
+
+    GravitinoAuthorizerProvider.getInstance().initialize(new ServerConfig());
+    return resourceConfig;
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testCommitTransactionSuccess(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    createTable(namespace, "txn_table1");
+    createTable(namespace, "txn_table2");
+
+    TableMetadata meta1 = loadTableMeta(namespace, "txn_table1");
+    TableMetadata meta2 = loadTableMeta(namespace, "txn_table2");
+
+    CommitTransactionRequest request = buildCommitRequest(namespace, meta1, 
meta2);
+    Response response = doCommitTransaction(request);
+
+    Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), 
response.getStatus());
+    Assertions.assertTrue(
+        dummyEventListener.popPreEvent() instanceof 
IcebergCommitTransactionPreEvent);
+    Assertions.assertTrue(
+        dummyEventListener.popPostEvent() instanceof 
IcebergCommitTransactionEvent);
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testCommitTransactionTableNotFound(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+
+    TableIdentifier missingId = TableIdentifier.of(namespace, 
"does_not_exist");
+    UpdateTableRequest change =
+        UpdateTableRequest.create(
+            missingId, List.of(), List.of(new 
MetadataUpdate.AddSchema(updatedSchema)));
+    CommitTransactionRequest request = new 
CommitTransactionRequest(List.of(change));
+    Response response = doCommitTransaction(request);
+
+    Assertions.assertEquals(Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    Assertions.assertTrue(
+        dummyEventListener.popPreEvent() instanceof 
IcebergCommitTransactionPreEvent);
+    Assertions.assertTrue(
+        dummyEventListener.popPostEvent() instanceof 
IcebergCommitTransactionFailureEvent);
+  }

Review Comment:
   Consider adding a test for *requirement validation failure* — e.g., two 
tables where one has a stale UUID. This is the core guarantee of the two-phase 
approach: no table should be modified when any pre-condition fails.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTransactionOperations.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.rest;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.apache.gravitino.listener.api.event.IcebergCommitTransactionEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergCommitTransactionFailureEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergCommitTransactionPreEvent;
+import org.apache.gravitino.server.ServerConfig;
+import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+@SuppressWarnings("deprecation")
+public class TestIcebergTransactionOperations extends IcebergNamespaceTestBase 
{
+
+  private static final Schema initialSchema =
+      new Schema(NestedField.of(1, false, "col_a", StringType.get()));
+
+  private static final Schema updatedSchema =
+      new Schema(
+          NestedField.of(1, false, "col_a", StringType.get()),
+          NestedField.of(2, true, "col_b", StringType.get()));
+
+  private DummyEventListener dummyEventListener;
+
+  @Override
+  protected Application configure() {
+    this.dummyEventListener = new DummyEventListener();
+    ResourceConfig resourceConfig =
+        IcebergRestTestUtil.getIcebergResourceConfig(
+            MockIcebergTransactionOperations.class, true, 
Arrays.asList(dummyEventListener));
+    resourceConfig.register(MockIcebergNamespaceOperations.class);
+    resourceConfig.register(MockIcebergTableOperations.class);
+    resourceConfig.register(MockIcebergTableRenameOperations.class);
+
+    resourceConfig.register(
+        new AbstractBinder() {
+          @Override
+          protected void configure() {
+            HttpServletRequest mockRequest = 
Mockito.mock(HttpServletRequest.class);
+            Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> 
"test-user");
+            bind(mockRequest).to(HttpServletRequest.class);
+          }
+        });
+
+    GravitinoAuthorizerProvider.getInstance().initialize(new ServerConfig());
+    return resourceConfig;
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testCommitTransactionSuccess(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    createTable(namespace, "txn_table1");
+    createTable(namespace, "txn_table2");
+
+    TableMetadata meta1 = loadTableMeta(namespace, "txn_table1");
+    TableMetadata meta2 = loadTableMeta(namespace, "txn_table2");
+
+    CommitTransactionRequest request = buildCommitRequest(namespace, meta1, 
meta2);
+    Response response = doCommitTransaction(request);
+
+    Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), 
response.getStatus());

Review Comment:
   This verifies HTTP 204 but doesn't confirm the tables were actually 
modified. Loading both tables after commit and asserting the new schema columns 
would strengthen confidence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to