bharos commented on code in PR #10675: URL: https://github.com/apache/gravitino/pull/10675#discussion_r3043317627
########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTransactionOperations.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.rest; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.google.common.annotations.VisibleForTesting; +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.gravitino.Entity; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; +import org.apache.gravitino.iceberg.service.IcebergRESTUtils; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTransactionOperationDispatcher; +import org.apache.gravitino.listener.api.event.IcebergRequestContext; +import org.apache.gravitino.metrics.MetricNames; +import org.apache.gravitino.server.authorization.annotations.AuthorizationExpression; +import org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata; +import org.apache.gravitino.server.web.Utils; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Handles {@code POST /v1/{prefix}/transactions/commit} for atomic multi-table commits. */ +@Path("/v1/{prefix:([^/]*/)?}transactions") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +public class IcebergTransactionOperations { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergTransactionOperations.class); + + @Context private HttpServletRequest httpRequest; + + private final IcebergTransactionOperationDispatcher transactionOperationDispatcher; + + @Inject + public IcebergTransactionOperations( + IcebergTransactionOperationDispatcher transactionOperationDispatcher) { + this.transactionOperationDispatcher = transactionOperationDispatcher; + } + + @POST + @Path("commit") + @Produces(MediaType.APPLICATION_JSON) + @Timed(name = "commit-transaction." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "commit-transaction", absolute = true) + @AuthorizationExpression( + expression = + "ANY(OWNER, METALAKE, CATALOG) || " + + "SCHEMA_OWNER_WITH_USE_CATALOG || " + + "ANY_USE_CATALOG", + accessMetadataType = MetadataObject.Type.CATALOG) Review Comment: This checks catalog-level permissions (`ANY_USE_CATALOG`), while the `updateTable` endpoint requires table-level permissions (`TABLE::OWNER || ANY_MODIFY_TABLE`). A user with only `USE_CATALOG` could modify tables via `commitTransaction` that they couldn't modify via `updateTable`. Worth considering per-table authorization checks within the dispatcher chain. ########## iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java: ########## @@ -251,6 +257,70 @@ public LoadTableResponse updateTable(IcebergTableChange icebergTableChange) { return loadTable(icebergTableChange.getTableIdentifier()); } + /** + * Commits multiple table updates in a best-effort atomic manner using a two-phase + * validate-then-commit approach: + * + * <ol> + * <li>Phase 1: Load all tables and validate ALL requirements against current metadata. If any + * requirement fails, the entire transaction is rejected before any commit is made. + * <li>Phase 2: Apply metadata updates and commit each table. Once phase 1 succeeds, commits are + * applied sequentially. + * </ol> + * + * <p>True cross-table atomicity (rollback of already-committed tables) depends on whether the + * underlying catalog backend supports it. Most Iceberg catalog backends do not provide + * cross-table rollback, but this two-phase approach ensures that no table is modified if any + * pre-condition check fails. + * + * <p>Each {@link UpdateTableRequest} in the request must include a {@link TableIdentifier}. + * + * @param commitTransactionRequest The request containing all table changes to apply. + */ + public void commitTransaction(CommitTransactionRequest commitTransactionRequest) { + commitTransactionRequest.validate(); + List<UpdateTableRequest> tableChanges = commitTransactionRequest.tableChanges(); + + // Phase 1: validate ALL requirements before committing anything + List<org.apache.iceberg.TableOperations> allOps = new ArrayList<>(tableChanges.size()); Review Comment: Nit: `org.apache.iceberg.TableOperations` is used as a fully qualified name inline. Per project conventions (AGENTS.md), this should use a standard `import` statement — no naming conflict here. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTransactionOperations.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.rest; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionEvent; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionFailureEvent; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionPreEvent; +import org.apache.gravitino.server.ServerConfig; +import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestIcebergTransactionOperations extends IcebergNamespaceTestBase { + + private static final Schema initialSchema = + new Schema(NestedField.of(1, false, "col_a", StringType.get())); + + private static final Schema updatedSchema = + new Schema( + NestedField.of(1, false, "col_a", StringType.get()), + NestedField.of(2, true, "col_b", StringType.get())); + + private DummyEventListener dummyEventListener; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTransactionOperations.class, true, Arrays.asList(dummyEventListener)); + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + GravitinoAuthorizerProvider.getInstance().initialize(new ServerConfig()); + return resourceConfig; + } + + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testCommitTransactionSuccess(Namespace namespace) { + verifyCreateNamespaceSucc(namespace); + createTable(namespace, "txn_table1"); + createTable(namespace, "txn_table2"); + + TableMetadata meta1 = loadTableMeta(namespace, "txn_table1"); + TableMetadata meta2 = loadTableMeta(namespace, "txn_table2"); + + CommitTransactionRequest request = buildCommitRequest(namespace, meta1, meta2); + Response response = doCommitTransaction(request); + + Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus()); Review Comment: This verifies HTTP 204 but doesn't confirm the tables were actually modified. Loading both tables after commit and asserting the new schema columns would strengthen confidence. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTransactionOperations.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.rest; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionEvent; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionFailureEvent; +import org.apache.gravitino.listener.api.event.IcebergCommitTransactionPreEvent; +import org.apache.gravitino.server.ServerConfig; +import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestIcebergTransactionOperations extends IcebergNamespaceTestBase { + + private static final Schema initialSchema = + new Schema(NestedField.of(1, false, "col_a", StringType.get())); + + private static final Schema updatedSchema = + new Schema( + NestedField.of(1, false, "col_a", StringType.get()), + NestedField.of(2, true, "col_b", StringType.get())); + + private DummyEventListener dummyEventListener; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTransactionOperations.class, true, Arrays.asList(dummyEventListener)); + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + GravitinoAuthorizerProvider.getInstance().initialize(new ServerConfig()); + return resourceConfig; + } + + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testCommitTransactionSuccess(Namespace namespace) { + verifyCreateNamespaceSucc(namespace); + createTable(namespace, "txn_table1"); + createTable(namespace, "txn_table2"); + + TableMetadata meta1 = loadTableMeta(namespace, "txn_table1"); + TableMetadata meta2 = loadTableMeta(namespace, "txn_table2"); + + CommitTransactionRequest request = buildCommitRequest(namespace, meta1, meta2); + Response response = doCommitTransaction(request); + + Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus()); + Assertions.assertTrue( + dummyEventListener.popPreEvent() instanceof IcebergCommitTransactionPreEvent); + Assertions.assertTrue( + dummyEventListener.popPostEvent() instanceof IcebergCommitTransactionEvent); + } + + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testCommitTransactionTableNotFound(Namespace namespace) { + verifyCreateNamespaceSucc(namespace); + + TableIdentifier missingId = TableIdentifier.of(namespace, "does_not_exist"); + UpdateTableRequest change = + UpdateTableRequest.create( + missingId, List.of(), List.of(new MetadataUpdate.AddSchema(updatedSchema))); + CommitTransactionRequest request = new CommitTransactionRequest(List.of(change)); + Response response = doCommitTransaction(request); + + Assertions.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); + Assertions.assertTrue( + dummyEventListener.popPreEvent() instanceof IcebergCommitTransactionPreEvent); + Assertions.assertTrue( + dummyEventListener.popPostEvent() instanceof IcebergCommitTransactionFailureEvent); + } Review Comment: Consider adding a test for *requirement validation failure* — e.g., two tables where one has a stale UUID. This is the core guarantee of the two-phase approach: no table should be modified when any pre-condition fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
