This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba27e89c58 IGNITE-20016 Introduce bulk operation to catalog (#2530)
ba27e89c58 is described below

commit ba27e89c5834bc47a3c97af6a84d9527d3c023f3
Author: korlov42 <[email protected]>
AuthorDate: Sat Sep 2 17:54:56 2023 +0300

    IGNITE-20016 Introduce bulk operation to catalog (#2530)
---
 .../ignite/internal/catalog/CatalogManager.java    | 15 ++++
 .../internal/catalog/CatalogManagerImpl.java       | 42 ++++++++++-
 .../internal/catalog/CatalogManagerSelfTest.java   | 83 ++++++++++++++++++++++
 .../sql/engine/exec/MockedStructuresTest.java      |  3 +-
 4 files changed, 141 insertions(+), 2 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 6b7e3b717e..3f292bbe3c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
@@ -49,6 +50,20 @@ public interface CatalogManager extends IgniteComponent, 
CatalogService {
      */
     CompletableFuture<Void> execute(CatalogCommand command) throws 
IllegalArgumentException;
 
+    /**
+     * Executes given list of commands atomically. That is, either all 
commands will be applied at once
+     * or neither of them. The whole bulk will increment catalog's version by 
a single point.
+     *
+     * <p>Accepts only those commands provided by builders returned by this 
very {@link CatalogManager}.
+     * Otherwise will throw {@link IllegalArgumentException}.
+     *
+     * @param commands Commands to execute.
+     * @return Future representing result of execution.
+     * @throws IllegalArgumentException If given command was created not by 
this manager.
+     * @see #createTableCommandBuilder()
+     */
+    CompletableFuture<Void> execute(List<CatalogCommand> commands) throws 
IllegalArgumentException;
+
     /**
      * Returns builder to create a command to create a new table.
      */
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index d80c099d15..6f9efb0e6f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -313,12 +313,27 @@ public class CatalogManagerImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> execute(CatalogCommand command) {
+        return saveUpdateAndWaitForActivation(toProducerOrThrow(command));
+    }
+
+    @Override
+    public CompletableFuture<Void> execute(List<CatalogCommand> commands) 
throws IllegalArgumentException {
+        List<UpdateProducer> producers = new ArrayList<>(commands.size());
+
+        for (CatalogCommand command : commands) {
+            producers.add(toProducerOrThrow(command));
+        }
+
+        return saveUpdateAndWaitForActivation(new 
BulkUpdateProducer(producers));
+    }
+
+    private static UpdateProducer toProducerOrThrow(CatalogCommand command) {
         if (!(command instanceof UpdateProducer)) {
             throw new IllegalArgumentException("Expected command created by 
this very manager, but got "
                     + (command == null ? "<null>" : 
command.getClass().getCanonicalName()));
         }
 
-        return saveUpdateAndWaitForActivation((UpdateProducer) command);
+        return (UpdateProducer) command;
     }
 
     @Override
@@ -839,4 +854,29 @@ public class CatalogManagerImpl extends 
Producer<CatalogEvent, CatalogEventParam
             }
         }
     }
+
+    private static class BulkUpdateProducer implements UpdateProducer {
+        private final List<UpdateProducer> producers;
+
+        BulkUpdateProducer(List<UpdateProducer> producers) {
+            this.producers = producers;
+        }
+
+        @Override
+        public List<UpdateEntry> get(Catalog catalog) {
+            List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
+
+            for (UpdateProducer producer : producers) {
+                List<UpdateEntry> entries = producer.get(catalog);
+
+                for (UpdateEntry entry : entries) {
+                    catalog = entry.applyUpdate(catalog);
+                }
+
+                bulkUpdateEntries.addAll(entries);
+            }
+
+            return bulkUpdateEntries;
+        }
+    }
 }
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index e95dbffe40..21f92469f9 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -47,6 +47,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1703,6 +1704,88 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
                 IllegalArgumentException.class,
                 () -> manager.execute(command)
         );
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> manager.execute(List.of(
+                        simpleTable("T1"),
+                        simpleTable("T2"),
+                        simpleTable("T3"),
+                        command,
+                        simpleTable("T4")
+                ))
+        );
+    }
+
+    @Test
+    void bulkCommandEitherAppliedAtomicallyOrDoesntAppliedAtAll() {
+        String tableName1 = "TEST1";
+        String tableName2 = "TEST2";
+        String tableName3 = "TEST1"; // intentional name conflict with table1
+
+        List<CatalogCommand> bulkUpdate = List.of(
+                simpleTable(tableName1),
+                simpleTable(tableName2),
+                simpleTable(tableName3)
+        );
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+        assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue());
+        assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue());
+
+        assertThat(manager.execute(bulkUpdate), 
willThrowFast(TableExistsValidationException.class));
+
+        // now let's truncate problematic table and retry
+        assertThat(manager.execute(bulkUpdate.subList(0, bulkUpdate.size() - 
1)), willCompleteSuccessfully());
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue());
+        assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue());
+    }
+
+    @Test
+    void bulkUpdateIncrementsVersionByOne() {
+        String tableName1 = "T1";
+        String tableName2 = "T2";
+        String tableName3 = "T3";
+
+        int versionBefore = manager.latestCatalogVersion();
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+        assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue());
+        assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue());
+
+        assertThat(
+                manager.execute(List.of(simpleTable(tableName1), 
simpleTable(tableName2), simpleTable(tableName3))),
+                willCompleteSuccessfully()
+        );
+
+        int versionAfter = manager.latestCatalogVersion();
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue());
+        assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue());
+        assertThat(manager.table(tableName3, Long.MAX_VALUE), notNullValue());
+
+        assertThat(versionAfter - versionBefore, is(1));
+    }
+
+    @Test
+    void bulkUpdateDoesntIncrementVersionInCaseOfError() {
+        String tableName1 = "T1";
+
+        int versionBefore = manager.latestCatalogVersion();
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+
+        assertThat(
+                manager.execute(List.of(simpleTable(tableName1), 
simpleTable(tableName1))),
+                willThrow(CatalogValidationException.class)
+        );
+
+        int versionAfter = manager.latestCatalogVersion();
+
+        assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue());
+
+        assertThat(versionAfter, is(versionBefore));
     }
 
     private CompletableFuture<Void> changeColumn(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index c6badd6d72..9e90c3c4f9 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -294,7 +295,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         catalogManager = mock(CatalogManager.class);
         when(catalogManager.createTableCommandBuilder()).thenReturn(new 
CreateTableCommand.Builder());
-        when(catalogManager.execute(any())).thenReturn(completedFuture(null));
+        
when(catalogManager.execute(any(CatalogCommand.class))).thenReturn(completedFuture(null));
         
when(catalogManager.dropTable(any())).thenReturn(completedFuture(null));
 
         schemaSyncService = mock(SchemaSyncService.class);

Reply via email to