This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f2daf967 Core: Add REST API for committing changes against multiple 
tables (#7569)
d8f2daf967 is described below

commit d8f2daf96782456fdce6fba42136a4239b3d126d
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jun 19 17:58:09 2023 +0200

    Core: Add REST API for committing changes against multiple tables (#7569)
---
 .../java/org/apache/iceberg/BaseTransaction.java   |   4 +
 .../org/apache/iceberg/catalog/TableCommit.java    |  52 ++++++++
 .../org/apache/iceberg/rest/CatalogHandlers.java   |   2 +-
 .../java/org/apache/iceberg/rest/RESTCatalog.java  |  13 ++
 .../apache/iceberg/rest/RESTSessionCatalog.java    |  20 ++++
 .../iceberg/rest/requests/UpdateTableRequest.java  |   2 +-
 .../apache/iceberg/rest/RESTCatalogAdapter.java    |  48 +++++++-
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 131 +++++++++++++++++++++
 8 files changed, 269 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index cef487931b..61da776f4c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -107,6 +107,10 @@ public class BaseTransaction implements Transaction {
   }
 
   public TableMetadata startMetadata() {
+    return base;
+  }
+
+  public TableMetadata currentMetadata() {
     return current;
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java 
b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
new file mode 100644
index 0000000000..a90df56c84
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import java.util.List;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.immutables.value.Value;
+
[email protected]
+public interface TableCommit {
+  TableIdentifier identifier();
+
+  List<UpdateRequirement> requirements();
+
+  List<MetadataUpdate> updates();
+
+  static TableCommit create(TableIdentifier identifier, TableMetadata base, 
TableMetadata updated) {
+    Preconditions.checkArgument(null != identifier, "Invalid table identifier: 
null");
+    Preconditions.checkArgument(null != base && null != updated, "Invalid 
table metadata: null");
+    Preconditions.checkArgument(
+        base.uuid().equals(updated.uuid()),
+        "UUID of base (%s) and updated (%s) table metadata does not match",
+        base.uuid(),
+        updated.uuid());
+
+    return ImmutableTableCommit.builder()
+        .identifier(identifier)
+        .requirements(UpdateRequirements.forUpdateTable(base, 
updated.changes()))
+        .updates(updated.changes())
+        .build();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index eef6204fff..6fddb08757 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -314,7 +314,7 @@ public class CatalogHandlers {
     return ops.current();
   }
 
-  private static TableMetadata commit(TableOperations ops, UpdateTableRequest 
request) {
+  static TableMetadata commit(TableOperations ops, UpdateTableRequest request) 
{
     AtomicBoolean isRetry = new AtomicBoolean(false);
     try {
       Tasks.foreach(ops)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 71195b9585..63b660c46a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -33,16 +33,19 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class RESTCatalog implements Catalog, SupportsNamespaces, 
Configurable<Object>, Closeable {
   private final RESTSessionCatalog sessionCatalog;
   private final Catalog delegate;
   private final SupportsNamespaces nsDelegate;
+  private final SessionCatalog.SessionContext context;
 
   public RESTCatalog() {
     this(
@@ -60,6 +63,7 @@ public class RESTCatalog implements Catalog, 
SupportsNamespaces, Configurable<Ob
     this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
     this.delegate = sessionCatalog.asCatalog(context);
     this.nsDelegate = (SupportsNamespaces) delegate;
+    this.context = context;
   }
 
   @Override
@@ -248,4 +252,13 @@ public class RESTCatalog implements Catalog, 
SupportsNamespaces, Configurable<Ob
   public void close() throws IOException {
     sessionCatalog.close();
   }
+
+  public void commitTransaction(List<TableCommit> commits) {
+    sessionCatalog.commitTransaction(context, commits);
+  }
+
+  public void commitTransaction(TableCommit... commits) {
+    sessionCatalog.commitTransaction(
+        context, ImmutableList.<TableCommit>builder().add(commits).build());
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index b83794acca..9da4e6e0fb 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.Transactions;
 import org.apache.iceberg.catalog.BaseSessionCatalog;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
@@ -63,13 +64,16 @@ import org.apache.iceberg.metrics.MetricsReporters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.iceberg.rest.auth.OAuth2Util;
 import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
 import org.apache.iceberg.rest.responses.GetNamespaceResponse;
@@ -916,4 +920,20 @@ public class RESTSessionCatalog extends BaseSessionCatalog
                 })
         .build();
   }
+
+  public void commitTransaction(SessionContext context, List<TableCommit> 
commits) {
+    List<UpdateTableRequest> tableChanges = 
Lists.newArrayListWithCapacity(commits.size());
+
+    for (TableCommit commit : commits) {
+      tableChanges.add(
+          new UpdateTableRequest(commit.identifier(), commit.requirements(), 
commit.updates()));
+    }
+
+    client.post(
+        paths.commitTransaction(),
+        new CommitTransactionRequest(tableChanges),
+        null,
+        headers(context),
+        ErrorHandlers.tableCommitHandler());
+  }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java 
b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
index 3e87be8e99..3249acfc8f 100644
--- 
a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
+++ 
b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
@@ -48,7 +48,7 @@ public class UpdateTableRequest implements RESTRequest {
     this.updates = updates;
   }
 
-  UpdateTableRequest(
+  public UpdateTableRequest(
       TableIdentifier identifier,
       List<org.apache.iceberg.UpdateRequirement> requirements,
       List<MetadataUpdate> updates) {
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java 
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index c6d4181844..2990fcca78 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -22,6 +22,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.Transactions;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -40,6 +45,8 @@ import 
org.apache.iceberg.exceptions.UnprocessableEntityException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -130,7 +137,9 @@ public class RESTCatalogAdapter implements RESTClient {
         HTTPMethod.POST,
         "v1/namespaces/{namespace}/tables/{table}/metrics",
         ReportMetricsRequest.class,
-        null);
+        null),
+    COMMIT_TRANSACTION(
+        HTTPMethod.POST, "v1/transactions/commit", 
CommitTransactionRequest.class, null);
 
     private final HTTPMethod method;
     private final int requiredLength;
@@ -357,12 +366,49 @@ public class RESTCatalogAdapter implements RESTClient {
           return null;
         }
 
+      case COMMIT_TRANSACTION:
+        {
+          CommitTransactionRequest request = 
castRequest(CommitTransactionRequest.class, body);
+          commitTransaction(catalog, request);
+          return null;
+        }
+
       default:
     }
 
     return null;
   }
 
+  /**
+   * This is a very simplistic approach that only validates the requirements 
for each table and does
+   * not do any other conflict detection. Therefore, it does not guarantee 
true transactional
+   * atomicity, which is left to the implementation details of a REST server.
+   */
+  private static void commitTransaction(Catalog catalog, 
CommitTransactionRequest request) {
+    List<Transaction> transactions = Lists.newArrayList();
+
+    for (UpdateTableRequest tableChange : request.tableChanges()) {
+      Table table = catalog.loadTable(tableChange.identifier());
+      if (table instanceof BaseTable) {
+        Transaction transaction =
+            Transactions.newTransaction(
+                tableChange.identifier().toString(), ((BaseTable) 
table).operations());
+        transactions.add(transaction);
+
+        BaseTransaction.TransactionTable txTable =
+            (BaseTransaction.TransactionTable) transaction.table();
+
+        // this performs validations and makes temporary commits that are 
in-memory
+        CatalogHandlers.commit(txTable.operations(), tableChange);
+      } else {
+        throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+      }
+    }
+
+    // only commit if validations passed previously
+    transactions.forEach(Transaction::commitTransaction);
+  }
+
   public <T extends RESTResponse> T execute(
       HTTPMethod method,
       String path,
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index ac9bcd803e..d7cf707903 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.rest;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
@@ -38,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTransaction;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileScanTask;
@@ -46,9 +48,16 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.CatalogTests;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.jdbc.JdbcCatalog;
 import org.apache.iceberg.metrics.MetricsReport;
@@ -1872,4 +1881,126 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
             eq(catalogHeaders),
             any());
   }
+
+  @Test
+  public void diffAgainstSingleTable() {
+    Namespace namespace = Namespace.of("namespace");
+    TableIdentifier identifier = TableIdentifier.of(namespace, 
"multipleDiffsAgainstSingleTable");
+
+    Table table = catalog().buildTable(identifier, SCHEMA).create();
+    Transaction transaction = table.newTransaction();
+
+    UpdateSchema updateSchema =
+        transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+    Schema expectedSchema = updateSchema.apply();
+    updateSchema.commit();
+
+    UpdatePartitionSpec updateSpec =
+        transaction.updateSpec().addField("shard", Expressions.bucket("id", 
16));
+    PartitionSpec expectedSpec = updateSpec.apply();
+    updateSpec.commit();
+
+    TableCommit tableCommit =
+        TableCommit.create(
+            identifier,
+            ((BaseTransaction) transaction).startMetadata(),
+            ((BaseTransaction) transaction).currentMetadata());
+
+    restCatalog.commitTransaction(tableCommit);
+
+    Table loaded = catalog().loadTable(identifier);
+    
assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct());
+    assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields());
+  }
+
+  @Test
+  public void multipleDiffsAgainstMultipleTables() {
+    Namespace namespace = Namespace.of("multiDiffNamespace");
+    TableIdentifier identifier1 = TableIdentifier.of(namespace, 
"multiDiffTable1");
+    TableIdentifier identifier2 = TableIdentifier.of(namespace, 
"multiDiffTable2");
+
+    Table table1 = catalog().buildTable(identifier1, SCHEMA).create();
+    Table table2 = catalog().buildTable(identifier2, SCHEMA).create();
+    Transaction t1Transaction = table1.newTransaction();
+    Transaction t2Transaction = table2.newTransaction();
+
+    UpdateSchema updateSchema =
+        t1Transaction.updateSchema().addColumn("new_col", 
Types.LongType.get());
+    Schema expectedSchema = updateSchema.apply();
+    updateSchema.commit();
+
+    UpdateSchema updateSchema2 =
+        t2Transaction.updateSchema().addColumn("new_col2", 
Types.LongType.get());
+    Schema expectedSchema2 = updateSchema2.apply();
+    updateSchema2.commit();
+
+    TableCommit tableCommit1 =
+        TableCommit.create(
+            identifier1,
+            ((BaseTransaction) t1Transaction).startMetadata(),
+            ((BaseTransaction) t1Transaction).currentMetadata());
+
+    TableCommit tableCommit2 =
+        TableCommit.create(
+            identifier2,
+            ((BaseTransaction) t2Transaction).startMetadata(),
+            ((BaseTransaction) t2Transaction).currentMetadata());
+
+    restCatalog.commitTransaction(tableCommit1, tableCommit2);
+
+    assertThat(catalog().loadTable(identifier1).schema().asStruct())
+        .isEqualTo(expectedSchema.asStruct());
+
+    assertThat(catalog().loadTable(identifier2).schema().asStruct())
+        .isEqualTo(expectedSchema2.asStruct());
+  }
+
+  @Test
+  public void multipleDiffsAgainstMultipleTablesLastFails() {
+    Namespace namespace = Namespace.of("multiDiffNamespace");
+    TableIdentifier identifier1 = TableIdentifier.of(namespace, 
"multiDiffTable1");
+    TableIdentifier identifier2 = TableIdentifier.of(namespace, 
"multiDiffTable2");
+
+    catalog().createTable(identifier1, SCHEMA);
+    catalog().createTable(identifier2, SCHEMA);
+
+    Table table1 = catalog().loadTable(identifier1);
+    Table table2 = catalog().loadTable(identifier2);
+    Schema originalSchemaOne = table1.schema();
+
+    Transaction t1Transaction = 
catalog().loadTable(identifier1).newTransaction();
+    t1Transaction.updateSchema().addColumn("new_col1", 
Types.LongType.get()).commit();
+
+    Transaction t2Transaction = 
catalog().loadTable(identifier2).newTransaction();
+    t2Transaction.updateSchema().renameColumn("data", "new-column").commit();
+
+    // delete the colum that is being renamed in the above TX to cause a 
conflict
+    table2.updateSchema().deleteColumn("data").commit();
+    Schema updatedSchemaTwo = table2.schema();
+
+    TableCommit tableCommit1 =
+        TableCommit.create(
+            identifier1,
+            ((BaseTransaction) t1Transaction).startMetadata(),
+            ((BaseTransaction) t1Transaction).currentMetadata());
+
+    TableCommit tableCommit2 =
+        TableCommit.create(
+            identifier2,
+            ((BaseTransaction) t2Transaction).startMetadata(),
+            ((BaseTransaction) t2Transaction).currentMetadata());
+
+    assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, 
tableCommit2))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Requirement failed: current schema changed: 
expected id 0 != 1");
+
+    Schema schema1 = catalog().loadTable(identifier1).schema();
+    assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct());
+
+    Schema schema2 = catalog().loadTable(identifier2).schema();
+    assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct());
+    assertThat(schema2.findField("data")).isNull();
+    assertThat(schema2.findField("new-column")).isNull();
+    assertThat(schema2.columns()).hasSize(1);
+  }
 }

Reply via email to