This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba27e89c58 IGNITE-20016 Introduce bulk operation to catalog (#2530)
ba27e89c58 is described below
commit ba27e89c5834bc47a3c97af6a84d9527d3c023f3
Author: korlov42 <[email protected]>
AuthorDate: Sat Sep 2 17:54:56 2023 +0300
IGNITE-20016 Introduce bulk operation to catalog (#2530)
---
.../ignite/internal/catalog/CatalogManager.java | 15 ++++
.../internal/catalog/CatalogManagerImpl.java | 42 ++++++++++-
.../internal/catalog/CatalogManagerSelfTest.java | 83 ++++++++++++++++++++++
.../sql/engine/exec/MockedStructuresTest.java | 3 +-
4 files changed, 141 insertions(+), 2 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 6b7e3b717e..3f292bbe3c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
@@ -49,6 +50,20 @@ public interface CatalogManager extends IgniteComponent,
CatalogService {
*/
CompletableFuture<Void> execute(CatalogCommand command) throws
IllegalArgumentException;
+ /**
+ * Executes given list of commands atomically. That is, either all
commands will be applied at once
+ * or neither of them. The whole bulk will increment catalog's version by
a single point.
+ *
+ * <p>Accepts only those commands provided by builders returned by this
very {@link CatalogManager}.
+ * Otherwise will throw {@link IllegalArgumentException}.
+ *
+ * @param commands Commands to execute.
+ * @return Future representing result of execution.
+ * @throws IllegalArgumentException If given command was created not by
this manager.
+ * @see #createTableCommandBuilder()
+ */
+ CompletableFuture<Void> execute(List<CatalogCommand> commands) throws
IllegalArgumentException;
+
/**
* Returns builder to create a command to create a new table.
*/
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index d80c099d15..6f9efb0e6f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -313,12 +313,27 @@ public class CatalogManagerImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> execute(CatalogCommand command) {
+ return saveUpdateAndWaitForActivation(toProducerOrThrow(command));
+ }
+
+ @Override
+ public CompletableFuture<Void> execute(List<CatalogCommand> commands)
throws IllegalArgumentException {
+ List<UpdateProducer> producers = new ArrayList<>(commands.size());
+
+ for (CatalogCommand command : commands) {
+ producers.add(toProducerOrThrow(command));
+ }
+
+ return saveUpdateAndWaitForActivation(new
BulkUpdateProducer(producers));
+ }
+
+ private static UpdateProducer toProducerOrThrow(CatalogCommand command) {
if (!(command instanceof UpdateProducer)) {
throw new IllegalArgumentException("Expected command created by
this very manager, but got "
+ (command == null ? "<null>" :
command.getClass().getCanonicalName()));
}
- return saveUpdateAndWaitForActivation((UpdateProducer) command);
+ return (UpdateProducer) command;
}
@Override
@@ -839,4 +854,29 @@ public class CatalogManagerImpl extends
Producer<CatalogEvent, CatalogEventParam
}
}
}
+
+ private static class BulkUpdateProducer implements UpdateProducer {
+ private final List<UpdateProducer> producers;
+
+ BulkUpdateProducer(List<UpdateProducer> producers) {
+ this.producers = producers;
+ }
+
+ @Override
+ public List<UpdateEntry> get(Catalog catalog) {
+ List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
+
+ for (UpdateProducer producer : producers) {
+ List<UpdateEntry> entries = producer.get(catalog);
+
+ for (UpdateEntry entry : entries) {
+ catalog = entry.applyUpdate(catalog);
+ }
+
+ bulkUpdateEntries.addAll(entries);
+ }
+
+ return bulkUpdateEntries;
+ }
+ }
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index e95dbffe40..21f92469f9 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -47,6 +47,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1703,6 +1704,88 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
IllegalArgumentException.class,
() -> manager.execute(command)
);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.execute(List.of(
+ simpleTable("T1"),
+ simpleTable("T2"),
+ simpleTable("T3"),
+ command,
+ simpleTable("T4")
+ ))
+ );
+ }
+
+ @Test
+ void bulkCommandEitherAppliedAtomicallyOrDoesntAppliedAtAll() {
+ String tableName1 = "TEST1";
+ String tableName2 = "TEST2";
+ String tableName3 = "TEST1"; // intentional name conflict with table1
+
+ List<CatalogCommand> bulkUpdate = List.of(
+ simpleTable(tableName1),
+ simpleTable(tableName2),
+ simpleTable(tableName3)
+ );
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+ assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue());
+ assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue());
+
+ assertThat(manager.execute(bulkUpdate),
willThrowFast(TableExistsValidationException.class));
+
+ // now let's truncate problematic table and retry
+ assertThat(manager.execute(bulkUpdate.subList(0, bulkUpdate.size() -
1)), willCompleteSuccessfully());
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue());
+ assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue());
+ }
+
+ @Test
+ void bulkUpdateIncrementsVersionByOne() {
+ String tableName1 = "T1";
+ String tableName2 = "T2";
+ String tableName3 = "T3";
+
+ int versionBefore = manager.latestCatalogVersion();
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+ assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue());
+ assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue());
+
+ assertThat(
+ manager.execute(List.of(simpleTable(tableName1),
simpleTable(tableName2), simpleTable(tableName3))),
+ willCompleteSuccessfully()
+ );
+
+ int versionAfter = manager.latestCatalogVersion();
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue());
+ assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue());
+ assertThat(manager.table(tableName3, Long.MAX_VALUE), notNullValue());
+
+ assertThat(versionAfter - versionBefore, is(1));
+ }
+
+ @Test
+ void bulkUpdateDoesntIncrementVersionInCaseOfError() {
+ String tableName1 = "T1";
+
+ int versionBefore = manager.latestCatalogVersion();
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+
+ assertThat(
+ manager.execute(List.of(simpleTable(tableName1),
simpleTable(tableName1))),
+ willThrow(CatalogValidationException.class)
+ );
+
+ int versionAfter = manager.latestCatalogVersion();
+
+ assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+
+ assertThat(versionAfter, is(versionBefore));
}
private CompletableFuture<Void> changeColumn(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index c6badd6d72..9e90c3c4f9 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -294,7 +295,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
catalogManager = mock(CatalogManager.class);
when(catalogManager.createTableCommandBuilder()).thenReturn(new
CreateTableCommand.Builder());
- when(catalogManager.execute(any())).thenReturn(completedFuture(null));
+
when(catalogManager.execute(any(CatalogCommand.class))).thenReturn(completedFuture(null));
when(catalogManager.dropTable(any())).thenReturn(completedFuture(null));
schemaSyncService = mock(SchemaSyncService.class);