This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d8f2daf967 Core: Add REST API for committing changes against multiple
tables (#7569)
d8f2daf967 is described below
commit d8f2daf96782456fdce6fba42136a4239b3d126d
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jun 19 17:58:09 2023 +0200
Core: Add REST API for committing changes against multiple tables (#7569)
---
.../java/org/apache/iceberg/BaseTransaction.java | 4 +
.../org/apache/iceberg/catalog/TableCommit.java | 52 ++++++++
.../org/apache/iceberg/rest/CatalogHandlers.java | 2 +-
.../java/org/apache/iceberg/rest/RESTCatalog.java | 13 ++
.../apache/iceberg/rest/RESTSessionCatalog.java | 20 ++++
.../iceberg/rest/requests/UpdateTableRequest.java | 2 +-
.../apache/iceberg/rest/RESTCatalogAdapter.java | 48 +++++++-
.../org/apache/iceberg/rest/TestRESTCatalog.java | 131 +++++++++++++++++++++
8 files changed, 269 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index cef487931b..61da776f4c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -107,6 +107,10 @@ public class BaseTransaction implements Transaction {
}
public TableMetadata startMetadata() {
+ return base;
+ }
+
+ public TableMetadata currentMetadata() {
return current;
}
diff --git a/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
new file mode 100644
index 0000000000..a90df56c84
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import java.util.List;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.immutables.value.Value;
+
[email protected]
+public interface TableCommit {
+ TableIdentifier identifier();
+
+ List<UpdateRequirement> requirements();
+
+ List<MetadataUpdate> updates();
+
+ static TableCommit create(TableIdentifier identifier, TableMetadata base,
TableMetadata updated) {
+ Preconditions.checkArgument(null != identifier, "Invalid table identifier:
null");
+ Preconditions.checkArgument(null != base && null != updated, "Invalid
table metadata: null");
+ Preconditions.checkArgument(
+ base.uuid().equals(updated.uuid()),
+ "UUID of base (%s) and updated (%s) table metadata does not match",
+ base.uuid(),
+ updated.uuid());
+
+ return ImmutableTableCommit.builder()
+ .identifier(identifier)
+ .requirements(UpdateRequirements.forUpdateTable(base,
updated.changes()))
+ .updates(updated.changes())
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index eef6204fff..6fddb08757 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -314,7 +314,7 @@ public class CatalogHandlers {
return ops.current();
}
- private static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ static TableMetadata commit(TableOperations ops, UpdateTableRequest request)
{
AtomicBoolean isRetry = new AtomicBoolean(false);
try {
Tasks.foreach(ops)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 71195b9585..63b660c46a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -33,16 +33,19 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
public class RESTCatalog implements Catalog, SupportsNamespaces,
Configurable<Object>, Closeable {
private final RESTSessionCatalog sessionCatalog;
private final Catalog delegate;
private final SupportsNamespaces nsDelegate;
+ private final SessionCatalog.SessionContext context;
public RESTCatalog() {
this(
@@ -60,6 +63,7 @@ public class RESTCatalog implements Catalog,
SupportsNamespaces, Configurable<Ob
this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
+ this.context = context;
}
@Override
@@ -248,4 +252,13 @@ public class RESTCatalog implements Catalog,
SupportsNamespaces, Configurable<Ob
public void close() throws IOException {
sessionCatalog.close();
}
+
+ public void commitTransaction(List<TableCommit> commits) {
+ sessionCatalog.commitTransaction(context, commits);
+ }
+
+ public void commitTransaction(TableCommit... commits) {
+ sessionCatalog.commitTransaction(
+ context, ImmutableList.<TableCommit>builder().add(commits).build());
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index b83794acca..9da4e6e0fb 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseSessionCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
@@ -63,13 +64,16 @@ import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
@@ -916,4 +920,20 @@ public class RESTSessionCatalog extends BaseSessionCatalog
})
.build();
}
+
+ public void commitTransaction(SessionContext context, List<TableCommit>
commits) {
+ List<UpdateTableRequest> tableChanges =
Lists.newArrayListWithCapacity(commits.size());
+
+ for (TableCommit commit : commits) {
+ tableChanges.add(
+ new UpdateTableRequest(commit.identifier(), commit.requirements(),
commit.updates()));
+ }
+
+ client.post(
+ paths.commitTransaction(),
+ new CommitTransactionRequest(tableChanges),
+ null,
+ headers(context),
+ ErrorHandlers.tableCommitHandler());
+ }
}
diff --git
a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
index 3e87be8e99..3249acfc8f 100644
---
a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
+++
b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
@@ -48,7 +48,7 @@ public class UpdateTableRequest implements RESTRequest {
this.updates = updates;
}
- UpdateTableRequest(
+ public UpdateTableRequest(
TableIdentifier identifier,
List<org.apache.iceberg.UpdateRequirement> requirements,
List<MetadataUpdate> updates) {
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index c6d4181844..2990fcca78 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -22,6 +22,11 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -40,6 +45,8 @@ import
org.apache.iceberg.exceptions.UnprocessableEntityException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -130,7 +137,9 @@ public class RESTCatalogAdapter implements RESTClient {
HTTPMethod.POST,
"v1/namespaces/{namespace}/tables/{table}/metrics",
ReportMetricsRequest.class,
- null);
+ null),
+ COMMIT_TRANSACTION(
+ HTTPMethod.POST, "v1/transactions/commit",
CommitTransactionRequest.class, null);
private final HTTPMethod method;
private final int requiredLength;
@@ -357,12 +366,49 @@ public class RESTCatalogAdapter implements RESTClient {
return null;
}
+ case COMMIT_TRANSACTION:
+ {
+ CommitTransactionRequest request =
castRequest(CommitTransactionRequest.class, body);
+ commitTransaction(catalog, request);
+ return null;
+ }
+
default:
}
return null;
}
+ /**
+ * This is a very simplistic approach that only validates the requirements
for each table and does
+ * not do any other conflict detection. Therefore, it does not guarantee
true transactional
+ * atomicity, which is left to the implementation details of a REST server.
+ */
+ private static void commitTransaction(Catalog catalog,
CommitTransactionRequest request) {
+ List<Transaction> transactions = Lists.newArrayList();
+
+ for (UpdateTableRequest tableChange : request.tableChanges()) {
+ Table table = catalog.loadTable(tableChange.identifier());
+ if (table instanceof BaseTable) {
+ Transaction transaction =
+ Transactions.newTransaction(
+ tableChange.identifier().toString(), ((BaseTable)
table).operations());
+ transactions.add(transaction);
+
+ BaseTransaction.TransactionTable txTable =
+ (BaseTransaction.TransactionTable) transaction.table();
+
+ // this performs validations and makes temporary commits that are
in-memory
+ CatalogHandlers.commit(txTable.operations(), tableChange);
+ } else {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+ }
+
+ // only commit if validations passed previously
+ transactions.forEach(Transaction::commitTransaction);
+ }
+
public <T extends RESTResponse> T execute(
HTTPMethod method,
String path,
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index ac9bcd803e..d7cf707903 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.rest;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@@ -38,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
@@ -46,9 +48,16 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.CatalogTests;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.metrics.MetricsReport;
@@ -1872,4 +1881,126 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
eq(catalogHeaders),
any());
}
+
+ @Test
+ public void diffAgainstSingleTable() {
+ Namespace namespace = Namespace.of("namespace");
+ TableIdentifier identifier = TableIdentifier.of(namespace,
"multipleDiffsAgainstSingleTable");
+
+ Table table = catalog().buildTable(identifier, SCHEMA).create();
+ Transaction transaction = table.newTransaction();
+
+ UpdateSchema updateSchema =
+ transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+ Schema expectedSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ UpdatePartitionSpec updateSpec =
+ transaction.updateSpec().addField("shard", Expressions.bucket("id",
16));
+ PartitionSpec expectedSpec = updateSpec.apply();
+ updateSpec.commit();
+
+ TableCommit tableCommit =
+ TableCommit.create(
+ identifier,
+ ((BaseTransaction) transaction).startMetadata(),
+ ((BaseTransaction) transaction).currentMetadata());
+
+ restCatalog.commitTransaction(tableCommit);
+
+ Table loaded = catalog().loadTable(identifier);
+
assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct());
+ assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields());
+ }
+
+ @Test
+ public void multipleDiffsAgainstMultipleTables() {
+ Namespace namespace = Namespace.of("multiDiffNamespace");
+ TableIdentifier identifier1 = TableIdentifier.of(namespace,
"multiDiffTable1");
+ TableIdentifier identifier2 = TableIdentifier.of(namespace,
"multiDiffTable2");
+
+ Table table1 = catalog().buildTable(identifier1, SCHEMA).create();
+ Table table2 = catalog().buildTable(identifier2, SCHEMA).create();
+ Transaction t1Transaction = table1.newTransaction();
+ Transaction t2Transaction = table2.newTransaction();
+
+ UpdateSchema updateSchema =
+ t1Transaction.updateSchema().addColumn("new_col",
Types.LongType.get());
+ Schema expectedSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ UpdateSchema updateSchema2 =
+ t2Transaction.updateSchema().addColumn("new_col2",
Types.LongType.get());
+ Schema expectedSchema2 = updateSchema2.apply();
+ updateSchema2.commit();
+
+ TableCommit tableCommit1 =
+ TableCommit.create(
+ identifier1,
+ ((BaseTransaction) t1Transaction).startMetadata(),
+ ((BaseTransaction) t1Transaction).currentMetadata());
+
+ TableCommit tableCommit2 =
+ TableCommit.create(
+ identifier2,
+ ((BaseTransaction) t2Transaction).startMetadata(),
+ ((BaseTransaction) t2Transaction).currentMetadata());
+
+ restCatalog.commitTransaction(tableCommit1, tableCommit2);
+
+ assertThat(catalog().loadTable(identifier1).schema().asStruct())
+ .isEqualTo(expectedSchema.asStruct());
+
+ assertThat(catalog().loadTable(identifier2).schema().asStruct())
+ .isEqualTo(expectedSchema2.asStruct());
+ }
+
+ @Test
+ public void multipleDiffsAgainstMultipleTablesLastFails() {
+ Namespace namespace = Namespace.of("multiDiffNamespace");
+ TableIdentifier identifier1 = TableIdentifier.of(namespace,
"multiDiffTable1");
+ TableIdentifier identifier2 = TableIdentifier.of(namespace,
"multiDiffTable2");
+
+ catalog().createTable(identifier1, SCHEMA);
+ catalog().createTable(identifier2, SCHEMA);
+
+ Table table1 = catalog().loadTable(identifier1);
+ Table table2 = catalog().loadTable(identifier2);
+ Schema originalSchemaOne = table1.schema();
+
+ Transaction t1Transaction =
catalog().loadTable(identifier1).newTransaction();
+ t1Transaction.updateSchema().addColumn("new_col1",
Types.LongType.get()).commit();
+
+ Transaction t2Transaction =
catalog().loadTable(identifier2).newTransaction();
+ t2Transaction.updateSchema().renameColumn("data", "new-column").commit();
+
+ // delete the colum that is being renamed in the above TX to cause a
conflict
+ table2.updateSchema().deleteColumn("data").commit();
+ Schema updatedSchemaTwo = table2.schema();
+
+ TableCommit tableCommit1 =
+ TableCommit.create(
+ identifier1,
+ ((BaseTransaction) t1Transaction).startMetadata(),
+ ((BaseTransaction) t1Transaction).currentMetadata());
+
+ TableCommit tableCommit2 =
+ TableCommit.create(
+ identifier2,
+ ((BaseTransaction) t2Transaction).startMetadata(),
+ ((BaseTransaction) t2Transaction).currentMetadata());
+
+ assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1,
tableCommit2))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Requirement failed: current schema changed:
expected id 0 != 1");
+
+ Schema schema1 = catalog().loadTable(identifier1).schema();
+ assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct());
+
+ Schema schema2 = catalog().loadTable(identifier2).schema();
+ assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct());
+ assertThat(schema2.findField("data")).isNull();
+ assertThat(schema2.findField("new-column")).isNull();
+ assertThat(schema2.columns()).hasSize(1);
+ }
}