This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 44c56eda3 IGNITE-16918 Sql. Races during table creation (#838)
44c56eda3 is described below

commit 44c56eda38427206e933b4e36ed69d50e8e612ea
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Jun 2 15:52:42 2022 +0300

    IGNITE-16918 Sql. Races during table creation (#838)
---
 .../ignite/internal/causality/VersionedValue.java  |   4 +
 .../internal/causality/VersionedValueTest.java     |  99 ++++++++++++------
 .../runner/app/ItIgniteNodeRestartTest.java        |   8 +-
 .../ignite/internal/schema/SchemaManager.java      |   4 -
 .../ignite/internal/schema/event/SchemaEvent.java  |   5 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 116 ++++++++++-----------
 .../engine/exec/schema/SqlSchemaManagerTest.java   |  13 +++
 .../internal/table/distributed/TableManager.java   |  53 +++++-----
 .../ignite/internal/table/TableManagerTest.java    |  75 ++++++++-----
 9 files changed, 221 insertions(+), 156 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index 6e43431ff..10b3df8c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -285,6 +285,8 @@ public class VersionedValue<T> {
         checkToken(actualToken0, causalityToken);
 
         completeInternal(causalityToken, value, null);
+
+        completeOnRevision(causalityToken);
     }
 
     /**
@@ -304,6 +306,8 @@ public class VersionedValue<T> {
         checkToken(actualToken0, causalityToken);
 
         completeInternal(causalityToken, null, throwable);
+
+        completeOnRevision(causalityToken);
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
index 791e1138c..14fac51c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.IntStream;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteTriConsumer;
 import org.junit.jupiter.api.BeforeEach;
@@ -53,6 +54,9 @@ public class VersionedValueTest {
     /** Test value. */
     public static final int TEST_VALUE = 1;
 
+    /** Test exception is used for exceptionally completion Versioned value 
object. */
+    public static final Exception TEST_EXCEPTION = new Exception("Test 
exception.");
+
     /** The test revision register is used to move the revision forward. */
     public static final TestRevisionRegister REGISTER = new 
TestRevisionRegister();
 
@@ -68,7 +72,7 @@ public class VersionedValueTest {
      */
     @Test
     public void testGetValueBeforeReady() throws OutdatedTokenException {
-        VersionedValue<Integer> intVersionedValue = new 
VersionedValue<>(REGISTER, 2, null);
+        VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
 
         CompletableFuture<Integer> fut = intVersionedValue.get(0);
 
@@ -76,8 +80,6 @@ public class VersionedValueTest {
 
         intVersionedValue.complete(0L, TEST_VALUE);
 
-        REGISTER.moveRevision(0L).join();
-
         assertTrue(fut.isDone());
 
         assertEquals(TEST_VALUE, fut.join());
@@ -85,6 +87,50 @@ public class VersionedValueTest {
         assertSame(fut.join(), intVersionedValue.get(0).join());
     }
 
+    /**
+     * Test checks completion of several sequential updates.
+     */
+    @Test
+    public void testManualCompleteSeveralTokens() {
+        VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
+
+        IntStream.range(5, 10).forEach(token -> {
+            CompletableFuture<Integer> fut = intVersionedValue.get(token);
+
+            assertFalse(fut.isDone());
+
+            intVersionedValue.complete(token, TEST_VALUE);
+
+            assertTrue(fut.isDone());
+
+            assertEquals(TEST_VALUE, fut.join());
+
+            assertSame(fut.join(), intVersionedValue.get(token).join());
+        });
+    }
+
+    /**
+     * Test checks exceptionally completion of several sequential updates.
+     */
+    @Test
+    public void testManualExceptionallyCompleteSeveralTokens() {
+        VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
+
+        IntStream.range(5, 10).forEach(token -> {
+            CompletableFuture<Integer> fut = intVersionedValue.get(token);
+
+            assertFalse(fut.isDone());
+
+            intVersionedValue.completeExceptionally(token, TEST_EXCEPTION);
+
+            assertTrue(fut.isDone());
+
+            assertThrows(Exception.class, fut::get);
+
+            assertThrows(Exception.class, () -> 
intVersionedValue.get(token).get());
+        });
+    }
+
     /**
      * The test explicitly sets a value to {@link VersionedValue} without 
waiting for the revision update.
      *
@@ -115,17 +161,15 @@ public class VersionedValueTest {
      */
     @Test
     public void testMissValueUpdateBeforeReady() throws OutdatedTokenException 
{
-        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(null);
 
         longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision(0L).join();
-
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision(1L).join();
+        longVersionedValue.complete(1);
 
         assertTrue(fut.isDone());
 
@@ -142,12 +186,11 @@ public class VersionedValueTest {
      */
     @Test
     public void testMissValueUpdate() throws OutdatedTokenException {
-        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(null);
 
         longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision(0L).join();
-        REGISTER.moveRevision(1L).join();
+        longVersionedValue.complete(1);
 
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
@@ -163,16 +206,12 @@ public class VersionedValueTest {
      */
     @Test
     public void testObsoleteToken() {
-        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(null);
 
         longVersionedValue.complete(0, TEST_VALUE);
-
-        REGISTER.moveRevision(0L).join();
-
         longVersionedValue.complete(1, TEST_VALUE);
 
-        REGISTER.moveRevision(1L).join();
-        REGISTER.moveRevision(2L).join();
+        longVersionedValue.complete(2);
 
         assertThrowsExactly(OutdatedTokenException.class, () -> 
longVersionedValue.get(0));
     }
@@ -182,18 +221,16 @@ public class VersionedValueTest {
      */
     @Test
     public void testAutocompleteFuture() throws OutdatedTokenException {
-        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(null);
 
         longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision(0L).join();
-
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision(1L).join();
-        REGISTER.moveRevision(2L).join();
+        longVersionedValue.complete(1);
+        longVersionedValue.complete(2);
 
         assertTrue(fut.isDone());
         assertTrue(longVersionedValue.get(2).isDone());
@@ -208,7 +245,7 @@ public class VersionedValueTest {
     public void testUpdate() throws Exception {
         VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.complete(0, TEST_VALUE);
+        longVersionedValue.update(0, (integer, throwable) -> 
CompletableFuture.completedFuture(TEST_VALUE));
 
         REGISTER.moveRevision(0L).join();
 
@@ -465,7 +502,7 @@ public class VersionedValueTest {
      */
     @Test
     public void testWhenComplete() {
-        VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
+        VersionedValue<Integer> vv = new VersionedValue<>(null);
 
         AtomicInteger a = new AtomicInteger();
         AtomicInteger cntr = new AtomicInteger(-1);
@@ -490,13 +527,11 @@ public class VersionedValueTest {
         vv.complete(token, TEST_VALUE);
 
         assertThrows(AssertionError.class, () -> vv.complete(finalToken0, 0));
-        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken0, new Exception()));
+        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken0, TEST_EXCEPTION));
 
         assertEquals(TEST_VALUE, a.get());
         assertEquals(token, cntr.get());
 
-        REGISTER.moveRevision(token).join();
-
         // Test update.
         token = 1;
 
@@ -504,7 +539,7 @@ public class VersionedValueTest {
 
         assertEquals(TEST_VALUE, a.get());
 
-        REGISTER.moveRevision(token).join();
+        vv.complete(token);
 
         assertEquals(TEST_VALUE + 1, a.get());
         assertEquals(token, cntr.get());
@@ -512,7 +547,7 @@ public class VersionedValueTest {
         // Test move revision.
         token = 2;
 
-        REGISTER.moveRevision(token).join();
+        vv.complete(token);
 
         assertEquals(TEST_VALUE + 1, a.get());
         assertEquals(token, cntr.get());
@@ -522,16 +557,14 @@ public class VersionedValueTest {
 
         final long finalToken3 = token;
 
-        vv.completeExceptionally(token, new Exception());
+        vv.completeExceptionally(token, TEST_EXCEPTION);
 
         assertThrows(AssertionError.class, () -> vv.complete(finalToken3, 0));
-        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken3, new Exception()));
+        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken3, TEST_EXCEPTION));
 
         assertEquals(-1, a.get());
         assertEquals(token, cntr.get());
 
-        REGISTER.moveRevision(token).join();
-
         assertEquals(token, cntr.get());
 
         // Test remove listener.
@@ -545,8 +578,6 @@ public class VersionedValueTest {
 
         assertEquals(0, a.get());
         assertEquals(token - 1, cntr.get());
-
-        REGISTER.moveRevision(token).join();
     }
 
     /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f7d77980f..d2ac1d9ec 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -71,6 +72,7 @@ import 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
 import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
 import org.apache.ignite.internal.rest.RestComponent;
 import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
@@ -279,6 +281,9 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 schemaManager
         );
 
+        //TODO: Get rid of it after IGNITE-17062.
+        SqlQueryProcessor queryProcessor = new SqlQueryProcessor(registry, 
clusterSvc, tableManager, dataStorageManager, Map::of);
+
         // Preparing the result map.
 
         res.add(vault);
@@ -313,7 +318,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 clusterCfgMgr,
                 dataStorageManager,
                 schemaManager,
-                tableManager
+                tableManager,
+                queryProcessor
         );
 
         for (IgniteComponent component : otherComponents) {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 791a22b6a..de2ccfa96 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -77,10 +77,6 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
 
         this.tablesCfg = tablesCfg;
-
-        registriesVv.whenComplete((token, registries, e) -> {
-            fireEvent(SchemaEvent.COMPLETE, new SchemaEventParameters(token, 
null, null), e);
-        });
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
index 22b2cca82..04d51deb8 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
@@ -24,8 +24,5 @@ import org.apache.ignite.internal.manager.Event;
  */
 public enum SchemaEvent implements Event {
     /** This event is fired when a schema was created. */
-    CREATE,
-
-    /** This event is fired when new schema manager revision is complete. */
-    COMPLETE
+    CREATE
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 5a6a74132..be2ed7b89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -72,13 +72,19 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         schemasVv = new VersionedValue<>(registry, HashMap::new);
         tablesVv = new VersionedValue<>(registry, HashMap::new);
 
-        calciteSchemaVv = new VersionedValue<>(registry, () -> {
+        calciteSchemaVv = new VersionedValue<>(null, () -> {
             SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
             newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new 
IgniteSchema(DEFAULT_SCHEMA_NAME));
             return newCalciteSchema;
         });
 
-        calciteSchemaVv.whenComplete((token, schema, e) -> 
listeners.forEach(SchemaUpdateListener::onSchemaUpdated));
+        schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
+            rebuild(token, stringIgniteSchemaMap);
+
+            listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+
+            tableManager.onSqlSchemaReady(token);
+        });
     }
 
     /** {@inheritDoc} */
@@ -143,22 +149,20 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
      * @param causalityToken Causality token.
      */
     public synchronized void onSchemaCreated(String schemaName, long 
causalityToken) {
-        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
+        schemasVv.update(
                 causalityToken,
                 (schemas, e) -> {
                     if (e != null) {
                         return failedFuture(e);
                     }
 
-                    Map<String, IgniteSchema> res =  new HashMap<>(schemas);
+                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                     res.putIfAbsent(schemaName, new IgniteSchema(schemaName));
 
                     return completedFuture(res);
                 }
         );
-
-        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -168,22 +172,20 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
      * @param causalityToken Causality token.
      */
     public synchronized void onSchemaDropped(String schemaName, long 
causalityToken) {
-        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
-                    causalityToken,
-                    (schemas, e) -> {
-                        if (e != null) {
-                            return failedFuture(e);
-                        }
+        schemasVv.update(
+                causalityToken,
+                (schemas, e) -> {
+                    if (e != null) {
+                        return failedFuture(e);
+                    }
 
-                        Map<String, IgniteSchema> res = new HashMap<>(schemas);
+                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                        res.remove(schemaName);
+                    res.remove(schemaName);
 
-                        return completedFuture(res);
-                    }
+                    return completedFuture(res);
+                }
         );
-
-        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -195,7 +197,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             TableImpl table,
             long causalityToken
     ) {
-        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
+        schemasVv.update(
                 causalityToken,
                 (schemas, e) -> {
                     if (e != null) {
@@ -211,25 +213,23 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                     schema.addTable(removeSchema(schemaName, table.name()), 
igniteTable);
 
                     return tablesVv
-                        .update(
-                            causalityToken,
-                            (tables, ex) -> {
-                                if (ex != null) {
-                                    return failedFuture(ex);
-                                }
+                            .update(
+                                    causalityToken,
+                                    (tables, ex) -> {
+                                        if (ex != null) {
+                                            return failedFuture(ex);
+                                        }
 
-                                Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
+                                        Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
 
-                                resTbls.put(igniteTable.id(), igniteTable);
+                                        resTbls.put(igniteTable.id(), 
igniteTable);
 
-                                return completedFuture(resTbls);
-                            }
-                        )
-                        .thenCompose(tables -> completedFuture(res));
+                                        return completedFuture(resTbls);
+                                    }
+                            )
+                            .thenCompose(tables -> completedFuture(res));
                 }
         );
-
-        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -253,7 +253,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             String tableName,
             long causalityToken
     ) {
-        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(causalityToken,
+        schemasVv.update(causalityToken,
                 (schemas, e) -> {
                     if (e != null) {
                         return failedFuture(e);
@@ -271,45 +271,41 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                         schema.removeTable(calciteTableName);
 
                         return tablesVv
-                            .update(causalityToken,
-                                (tables, ex) -> {
-                                    if (ex != null) {
-                                        return failedFuture(ex);
-                                    }
+                                .update(causalityToken,
+                                        (tables, ex) -> {
+                                            if (ex != null) {
+                                                return failedFuture(ex);
+                                            }
 
-                                    Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
+                                            Map<UUID, IgniteTable> resTbls = 
new HashMap<>(tables);
 
-                                    resTbls.remove(table.id());
+                                            resTbls.remove(table.id());
 
-                                    return completedFuture(resTbls);
-                                }
-                            )
-                            .thenCompose(tables -> completedFuture(res));
+                                            return completedFuture(resTbls);
+                                        }
+                                )
+                                .thenCompose(tables -> completedFuture(res));
                     }
 
                     return completedFuture(res);
                 }
         );
-
-        rebuild(causalityToken, schemasMapFut);
     }
 
-    private void rebuild(long causalityToken, CompletableFuture<Map<String, 
IgniteSchema>> schemasFut) {
-        schemasFut.thenCompose(schemas -> {
-            SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
-
-            newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+    /**
+     * Rebuilds Calcite schemas.
+     *
+     * @param causalityToken Causality token.
+     * @param schemas Ignite schemas.
+     */
+    private void rebuild(long causalityToken, Map<String, IgniteSchema> 
schemas) {
+        SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
 
-            schemas.forEach(newCalciteSchema::add);
+        newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
 
-            return calciteSchemaVv.update(causalityToken, (s, e) -> {
-                if (e != null) {
-                    return failedFuture(e);
-                }
+        schemas.forEach(newCalciteSchema::add);
 
-                return completedFuture(newCalciteSchema);
-            });
-        });
+        calciteSchemaVv.complete(causalityToken, newCalciteSchema);
     }
 
     private IgniteTableImpl convert(TableImpl table) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 79dd5df1a..58f8f20cb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -25,6 +25,7 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -105,6 +106,9 @@ public class SqlSchemaManagerTest {
         assertThat(ex.getMessage(), containsString("Table not found"));
 
         Mockito.verify(tableManager).table(eq(tblId));
+
+        Mockito.verify(tableManager, times(1)).onSqlSchemaReady(anyLong());
+
         Mockito.verifyNoMoreInteractions(tableManager);
     }
 
@@ -125,6 +129,9 @@ public class SqlSchemaManagerTest {
         assertEquals(tableId, actTable.id());
 
         Mockito.verify(tableManager).table(eq(tableId));
+
+        Mockito.verify(tableManager, times(1)).onSqlSchemaReady(anyLong());
+
         Mockito.verifyNoMoreInteractions(tableManager);
     }
 
@@ -147,6 +154,8 @@ public class SqlSchemaManagerTest {
 
         assertEquals(tableId, actTable.id());
 
+        Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
         Mockito.verifyNoMoreInteractions(tableManager);
     }
 
@@ -169,6 +178,8 @@ public class SqlSchemaManagerTest {
 
         assertEquals(tableId, actTable.id());
 
+        Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
         Mockito.verifyNoMoreInteractions(tableManager);
     }
 
@@ -197,6 +208,8 @@ public class SqlSchemaManagerTest {
         assertThat(ex.getMessage(), containsString("Table version not found"));
 
         Mockito.verify(tableManager, times(2)).table(eq(tableId));
+        Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
         Mockito.verifyNoMoreInteractions(tableManager);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 61179919b..b7d57e846 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -37,7 +37,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -192,12 +191,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         clusterNodeResolver = topologyService::getByAddress;
 
         tablesByIdVv = new VersionedValue<>(null, HashMap::new);
-
-        this.schemaManager.listen(SchemaEvent.COMPLETE, (parameters, e) -> {
-            tablesByIdVv.complete(parameters.causalityToken());
-
-            return false;
-        });
     }
 
     /** {@inheritDoc} */
@@ -228,19 +221,30 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() {
             /** {@inheritDoc} */
-            @Override public boolean notify(@NotNull SchemaEventParameters 
parameters, @Nullable Throwable exception) {
-                
tablesByIdVv.get(parameters.causalityToken()).thenAccept(tablesById -> {
+            @Override
+            public boolean notify(@NotNull SchemaEventParameters parameters, 
@Nullable Throwable exception) {
+                if (tablesByIdVv.latest().get(parameters.tableId()) != null) {
                     fireEvent(
                             TableEvent.ALTER,
-                            new 
TableEventParameters(parameters.causalityToken(), 
tablesById.get(parameters.tableId())), null
+                            new 
TableEventParameters(parameters.causalityToken(), 
tablesByIdVv.latest().get(parameters.tableId())), null
                     );
-                });
+                }
 
                 return false;
             }
         });
     }
 
+    /**
+     * Completes all table futures.
+     * TODO: Get rid of it after IGNITE-17062.
+     *
+     * @param causalityToken Causality token.
+     */
+    public void onSqlSchemaReady(long causalityToken) {
+        tablesByIdVv.complete(causalityToken);
+    }
+
     /**
      * Listener of table create configuration change.
      *
@@ -447,8 +451,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         var table = new TableImpl(internalTable);
 
-        CompletableFuture<Void> schemaFut = 
schemaManager.schemaRegistry(causalityToken, 
tblId).thenAccept(table::schemaView);
-
         tablesByIdVv.update(causalityToken, (previous, e) -> {
             if (e != null) {
                 return failedFuture(e);
@@ -461,14 +463,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             return completedFuture(val);
         });
 
-        // TODO should be reworked in IGNITE-16763
-        return tablesByIdVv.get(causalityToken)
-            .thenCompose(v -> schemaFut)
-            .thenRun(() -> {
-                fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, table), null);
+        schemaManager.schemaRegistry(causalityToken, tblId)
+                .thenAccept(table::schemaView)
+                .thenRun(() -> fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, table), null));
 
-                completeApiCreateFuture(table);
-            });
+        // TODO should be reworked in IGNITE-16763
+        return tablesByIdVv.get(causalityToken).thenRun(() -> 
completeApiCreateFuture(table));
     }
 
     /**
@@ -502,8 +502,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 raftMgr.stopRaftGroup(raftGroupName(tblId, p));
             }
 
-            AtomicReference<TableImpl> tableHolder = new AtomicReference<>();
-
             tablesByIdVv.update(causalityToken, (previousVal, e) -> {
                 if (e != null) {
                     return failedFuture(e);
@@ -511,16 +509,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 var map = new HashMap<>(previousVal);
 
-                TableImpl table = map.remove(tblId);
-
-                tableHolder.set(table);
+                map.remove(tblId);
 
                 return completedFuture(map);
             });
 
-            TableImpl table = tableHolder.get();
+            TableImpl table = tablesByIdVv.latest().get(tblId);
 
-            assert table != null : "There is no table with the name specified 
[name=" + name + ']';
+            assert table != null : IgniteStringFormatter.format("There is no 
table with the name specified [name={}, id={}]",
+                    name, tblId);
 
             table.internalTable().storage().destroy();
 
@@ -1046,7 +1043,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }
 
                 if (e == null) {
-                    getTblFut.complete(parameters.table());
+                    tablesByIdVv.get(parameters.causalityToken()).thenRun(() 
-> getTblFut.complete(parameters.table()));
                 } else {
                     getTblFut.completeExceptionally(e);
                 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index c60f0ef47..2d24d0511 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -43,7 +43,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.configuration.NamedListView;
@@ -79,6 +82,7 @@ import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMe
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -231,20 +235,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         when(rm.updateRaftGroup(any(), any(), any(), any())).thenAnswer(mock ->
                 
CompletableFuture.completedFuture(mock(RaftGroupService.class)));
 
-        TableManager tableManager = new TableManager(
-                revisionUpdater,
-                tblsCfg,
-                rm,
-                bm,
-                ts,
-                tm,
-                dsm = createDataStorageManager(configRegistry, workDir, 
pageMemoryEngineConfig),
-                sm = new SchemaManager(revisionUpdater, tblsCfg)
-        );
-
-        sm.start();
-
-        tableManager.start();
+        TableManager tableManager = createTableManager(tblManagerFut, false);
 
         tblManagerFut.complete(tableManager);
 
@@ -339,7 +330,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     @Test
     public void testApiTableManagerOnStop() {
-        createTableManager(tblManagerFut);
+        createTableManager(tblManagerFut, false);
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -390,7 +381,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     @Test
     public void testInternalApiTableManagerOnStop() {
-        createTableManager(tblManagerFut);
+        createTableManager(tblManagerFut, false);
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -443,7 +434,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() 
-> {
             try {
                 return mockManagersAndCreateTableWithDelay(scmTbl, 
tblManagerFut, phaser);
-            } catch (NodeStoppingException e) {
+            } catch (Exception e) {
                 fail(e.getMessage());
             }
 
@@ -505,12 +496,12 @@ public class TableManagerTest extends IgniteAbstractTest {
      * @param tableDefinition Configuration schema for a table.
      * @param tblManagerFut Future for table manager.
      * @return Table.
-     * @throws NodeStoppingException If something went wrong.
+     * @throws Exception If something went wrong.
      */
     private TableImpl mockManagersAndCreateTable(
             TableDefinition tableDefinition,
             CompletableFuture<TableManager> tblManagerFut
-    ) throws NodeStoppingException {
+    ) throws Exception {
         return mockManagersAndCreateTableWithDelay(tableDefinition, 
tblManagerFut, null);
     }
 
@@ -521,13 +512,13 @@ public class TableManagerTest extends IgniteAbstractTest {
      * @param tblManagerFut Future for table manager.
      * @param phaser Phaser for the wait.
      * @return Table manager.
-     * @throws NodeStoppingException If something went wrong.
+     * @throws Exception If something went wrong.
      */
     private TableImpl mockManagersAndCreateTableWithDelay(
             TableDefinition tableDefinition,
             CompletableFuture<TableManager> tblManagerFut,
             Phaser phaser
-    ) throws NodeStoppingException {
+    ) throws Exception {
         when(rm.updateRaftGroup(any(), any(), any(), any())).thenAnswer(mock 
-> {
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
@@ -558,7 +549,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                     .thenReturn(assignment);
         }
 
-        TableManager tableManager = createTableManager(tblManagerFut);
+        TableManager tableManager = createTableManager(tblManagerFut, true);
 
         final int tablesBeforeCreation = tableManager.tables().size();
 
@@ -580,12 +571,35 @@ public class TableManagerTest extends IgniteAbstractTest {
             return CompletableFuture.completedFuture(null);
         });
 
-        TableImpl tbl2 = (TableImpl) 
tableManager.createTable(tableDefinition.canonicalName(),
+        CountDownLatch createTblLatch = new CountDownLatch(1);
+
+        AtomicLong token = new AtomicLong();
+
+        tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
+
+            createTblLatch.countDown();
+
+            token.set(parameters.causalityToken());
+
+            return true;
+        });
+
+        CompletableFuture<Table> tbl2Fut = 
tableManager.createTableAsync(tableDefinition.canonicalName(),
                 tblCh -> SchemaConfigurationConverter.convert(tableDefinition, 
tblCh)
                         .changeReplicas(REPLICAS)
                         .changePartitions(PARTITIONS)
         );
 
+        assertFalse(tbl2Fut.isDone());
+
+        assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
+
+        assertFalse(tbl2Fut.isDone());
+
+        tableManager.onSqlSchemaReady(token.get());
+
+        TableImpl tbl2 = (TableImpl) tbl2Fut.get();
+
         assertNotNull(tbl2);
 
         assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
@@ -596,10 +610,12 @@ public class TableManagerTest extends IgniteAbstractTest {
     /**
      * Creates Table manager.
      *
-     * @param tblManagerFut Future to wrap Table manager.
+     * @param tblManagerFut    Future to wrap Table manager.
+     * @param waitingSqlSchema If the flag is true, a table will wait of 
{@link TableManager#onSqlSchemaReady(long)} invocation before
+     *                         create otherwise, the waiting will not be.
      * @return Table manager.
      */
-    private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut) {
+    private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut, boolean waitingSqlSchema) {
         TableManager tableManager = new TableManager(
                 revisionUpdater,
                 tblsCfg,
@@ -613,6 +629,15 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         sm.start();
 
+        //TODO: Get rid of it after IGNITE-17062.
+        if (!waitingSqlSchema) {
+            tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
+                tableManager.onSqlSchemaReady(parameters.causalityToken());
+
+                return false;
+            });
+        }
+
         tableManager.start();
 
         tblManagerFut.complete(tableManager);

Reply via email to