This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 44c56eda3 IGNITE-16918 Sql. Races during table creation (#838)
44c56eda3 is described below
commit 44c56eda38427206e933b4e36ed69d50e8e612ea
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Jun 2 15:52:42 2022 +0300
IGNITE-16918 Sql. Races during table creation (#838)
---
.../ignite/internal/causality/VersionedValue.java | 4 +
.../internal/causality/VersionedValueTest.java | 99 ++++++++++++------
.../runner/app/ItIgniteNodeRestartTest.java | 8 +-
.../ignite/internal/schema/SchemaManager.java | 4 -
.../ignite/internal/schema/event/SchemaEvent.java | 5 +-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 116 ++++++++++-----------
.../engine/exec/schema/SqlSchemaManagerTest.java | 13 +++
.../internal/table/distributed/TableManager.java | 53 +++++-----
.../ignite/internal/table/TableManagerTest.java | 75 ++++++++-----
9 files changed, 221 insertions(+), 156 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index 6e43431ff..10b3df8c3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -285,6 +285,8 @@ public class VersionedValue<T> {
checkToken(actualToken0, causalityToken);
completeInternal(causalityToken, value, null);
+
+ completeOnRevision(causalityToken);
}
/**
@@ -304,6 +306,8 @@ public class VersionedValue<T> {
checkToken(actualToken0, causalityToken);
completeInternal(causalityToken, null, throwable);
+
+ completeOnRevision(causalityToken);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
index 791e1138c..14fac51c5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.IntStream;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteTriConsumer;
import org.junit.jupiter.api.BeforeEach;
@@ -53,6 +54,9 @@ public class VersionedValueTest {
/** Test value. */
public static final int TEST_VALUE = 1;
+ /** Test exception is used for exceptionally completion Versioned value
object. */
+ public static final Exception TEST_EXCEPTION = new Exception("Test
exception.");
+
/** The test revision register is used to move the revision forward. */
public static final TestRevisionRegister REGISTER = new
TestRevisionRegister();
@@ -68,7 +72,7 @@ public class VersionedValueTest {
*/
@Test
public void testGetValueBeforeReady() throws OutdatedTokenException {
- VersionedValue<Integer> intVersionedValue = new
VersionedValue<>(REGISTER, 2, null);
+ VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
CompletableFuture<Integer> fut = intVersionedValue.get(0);
@@ -76,8 +80,6 @@ public class VersionedValueTest {
intVersionedValue.complete(0L, TEST_VALUE);
- REGISTER.moveRevision(0L).join();
-
assertTrue(fut.isDone());
assertEquals(TEST_VALUE, fut.join());
@@ -85,6 +87,50 @@ public class VersionedValueTest {
assertSame(fut.join(), intVersionedValue.get(0).join());
}
+ /**
+ * Test checks completion of several sequential updates.
+ */
+ @Test
+ public void testManualCompleteSeveralTokens() {
+ VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
+
+ IntStream.range(5, 10).forEach(token -> {
+ CompletableFuture<Integer> fut = intVersionedValue.get(token);
+
+ assertFalse(fut.isDone());
+
+ intVersionedValue.complete(token, TEST_VALUE);
+
+ assertTrue(fut.isDone());
+
+ assertEquals(TEST_VALUE, fut.join());
+
+ assertSame(fut.join(), intVersionedValue.get(token).join());
+ });
+ }
+
+ /**
+ * Test checks exceptionally completion of several sequential updates.
+ */
+ @Test
+ public void testManualExceptionallyCompleteSeveralTokens() {
+ VersionedValue<Integer> intVersionedValue = new VersionedValue<>(null);
+
+ IntStream.range(5, 10).forEach(token -> {
+ CompletableFuture<Integer> fut = intVersionedValue.get(token);
+
+ assertFalse(fut.isDone());
+
+ intVersionedValue.completeExceptionally(token, TEST_EXCEPTION);
+
+ assertTrue(fut.isDone());
+
+ assertThrows(Exception.class, fut::get);
+
+ assertThrows(Exception.class, () ->
intVersionedValue.get(token).get());
+ });
+ }
+
/**
* The test explicitly sets a value to {@link VersionedValue} without
waiting for the revision update.
*
@@ -115,17 +161,15 @@ public class VersionedValueTest {
*/
@Test
public void testMissValueUpdateBeforeReady() throws OutdatedTokenException
{
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
+ VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(null);
longVersionedValue.complete(0, TEST_VALUE);
- REGISTER.moveRevision(0L).join();
-
CompletableFuture<Integer> fut = longVersionedValue.get(1);
assertFalse(fut.isDone());
- REGISTER.moveRevision(1L).join();
+ longVersionedValue.complete(1);
assertTrue(fut.isDone());
@@ -142,12 +186,11 @@ public class VersionedValueTest {
*/
@Test
public void testMissValueUpdate() throws OutdatedTokenException {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
+ VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(null);
longVersionedValue.complete(0, TEST_VALUE);
- REGISTER.moveRevision(0L).join();
- REGISTER.moveRevision(1L).join();
+ longVersionedValue.complete(1);
CompletableFuture<Integer> fut = longVersionedValue.get(1);
@@ -163,16 +206,12 @@ public class VersionedValueTest {
*/
@Test
public void testObsoleteToken() {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
+ VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(null);
longVersionedValue.complete(0, TEST_VALUE);
-
- REGISTER.moveRevision(0L).join();
-
longVersionedValue.complete(1, TEST_VALUE);
- REGISTER.moveRevision(1L).join();
- REGISTER.moveRevision(2L).join();
+ longVersionedValue.complete(2);
assertThrowsExactly(OutdatedTokenException.class, () ->
longVersionedValue.get(0));
}
@@ -182,18 +221,16 @@ public class VersionedValueTest {
*/
@Test
public void testAutocompleteFuture() throws OutdatedTokenException {
- VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
+ VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(null);
longVersionedValue.complete(0, TEST_VALUE);
- REGISTER.moveRevision(0L).join();
-
CompletableFuture<Integer> fut = longVersionedValue.get(1);
assertFalse(fut.isDone());
- REGISTER.moveRevision(1L).join();
- REGISTER.moveRevision(2L).join();
+ longVersionedValue.complete(1);
+ longVersionedValue.complete(2);
assertTrue(fut.isDone());
assertTrue(longVersionedValue.get(2).isDone());
@@ -208,7 +245,7 @@ public class VersionedValueTest {
public void testUpdate() throws Exception {
VersionedValue<Integer> longVersionedValue = new
VersionedValue<>(REGISTER);
- longVersionedValue.complete(0, TEST_VALUE);
+ longVersionedValue.update(0, (integer, throwable) ->
CompletableFuture.completedFuture(TEST_VALUE));
REGISTER.moveRevision(0L).join();
@@ -465,7 +502,7 @@ public class VersionedValueTest {
*/
@Test
public void testWhenComplete() {
- VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
+ VersionedValue<Integer> vv = new VersionedValue<>(null);
AtomicInteger a = new AtomicInteger();
AtomicInteger cntr = new AtomicInteger(-1);
@@ -490,13 +527,11 @@ public class VersionedValueTest {
vv.complete(token, TEST_VALUE);
assertThrows(AssertionError.class, () -> vv.complete(finalToken0, 0));
- assertThrows(AssertionError.class, () ->
vv.completeExceptionally(finalToken0, new Exception()));
+ assertThrows(AssertionError.class, () ->
vv.completeExceptionally(finalToken0, TEST_EXCEPTION));
assertEquals(TEST_VALUE, a.get());
assertEquals(token, cntr.get());
- REGISTER.moveRevision(token).join();
-
// Test update.
token = 1;
@@ -504,7 +539,7 @@ public class VersionedValueTest {
assertEquals(TEST_VALUE, a.get());
- REGISTER.moveRevision(token).join();
+ vv.complete(token);
assertEquals(TEST_VALUE + 1, a.get());
assertEquals(token, cntr.get());
@@ -512,7 +547,7 @@ public class VersionedValueTest {
// Test move revision.
token = 2;
- REGISTER.moveRevision(token).join();
+ vv.complete(token);
assertEquals(TEST_VALUE + 1, a.get());
assertEquals(token, cntr.get());
@@ -522,16 +557,14 @@ public class VersionedValueTest {
final long finalToken3 = token;
- vv.completeExceptionally(token, new Exception());
+ vv.completeExceptionally(token, TEST_EXCEPTION);
assertThrows(AssertionError.class, () -> vv.complete(finalToken3, 0));
- assertThrows(AssertionError.class, () ->
vv.completeExceptionally(finalToken3, new Exception()));
+ assertThrows(AssertionError.class, () ->
vv.completeExceptionally(finalToken3, TEST_EXCEPTION));
assertEquals(-1, a.get());
assertEquals(token, cntr.get());
- REGISTER.moveRevision(token).join();
-
assertEquals(token, cntr.get());
// Test remove listener.
@@ -545,8 +578,6 @@ public class VersionedValueTest {
assertEquals(0, a.get());
assertEquals(token - 1, cntr.get());
-
- REGISTER.moveRevision(token).join();
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f7d77980f..d2ac1d9ec 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -71,6 +72,7 @@ import
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
@@ -279,6 +281,9 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
schemaManager
);
+ //TODO: Get rid of it after IGNITE-17062.
+ SqlQueryProcessor queryProcessor = new SqlQueryProcessor(registry,
clusterSvc, tableManager, dataStorageManager, Map::of);
+
// Preparing the result map.
res.add(vault);
@@ -313,7 +318,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
clusterCfgMgr,
dataStorageManager,
schemaManager,
- tableManager
+ tableManager,
+ queryProcessor
);
for (IgniteComponent component : otherComponents) {
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 791a22b6a..de2ccfa96 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -77,10 +77,6 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
this.tablesCfg = tablesCfg;
-
- registriesVv.whenComplete((token, registries, e) -> {
- fireEvent(SchemaEvent.COMPLETE, new SchemaEventParameters(token,
null, null), e);
- });
}
/** {@inheritDoc} */
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
index 22b2cca82..04d51deb8 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
@@ -24,8 +24,5 @@ import org.apache.ignite.internal.manager.Event;
*/
public enum SchemaEvent implements Event {
/** This event is fired when a schema was created. */
- CREATE,
-
- /** This event is fired when new schema manager revision is complete. */
- COMPLETE
+ CREATE
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 5a6a74132..be2ed7b89 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -72,13 +72,19 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
schemasVv = new VersionedValue<>(registry, HashMap::new);
tablesVv = new VersionedValue<>(registry, HashMap::new);
- calciteSchemaVv = new VersionedValue<>(registry, () -> {
+ calciteSchemaVv = new VersionedValue<>(null, () -> {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new
IgniteSchema(DEFAULT_SCHEMA_NAME));
return newCalciteSchema;
});
- calciteSchemaVv.whenComplete((token, schema, e) ->
listeners.forEach(SchemaUpdateListener::onSchemaUpdated));
+ schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
+ rebuild(token, stringIgniteSchemaMap);
+
+ listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+
+ tableManager.onSqlSchemaReady(token);
+ });
}
/** {@inheritDoc} */
@@ -143,22 +149,20 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
* @param causalityToken Causality token.
*/
public synchronized void onSchemaCreated(String schemaName, long
causalityToken) {
- CompletableFuture<Map<String, IgniteSchema>> schemasMapFut =
schemasVv.update(
+ schemasVv.update(
causalityToken,
(schemas, e) -> {
if (e != null) {
return failedFuture(e);
}
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
+ Map<String, IgniteSchema> res = new HashMap<>(schemas);
res.putIfAbsent(schemaName, new IgniteSchema(schemaName));
return completedFuture(res);
}
);
-
- rebuild(causalityToken, schemasMapFut);
}
/**
@@ -168,22 +172,20 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
* @param causalityToken Causality token.
*/
public synchronized void onSchemaDropped(String schemaName, long
causalityToken) {
- CompletableFuture<Map<String, IgniteSchema>> schemasMapFut =
schemasVv.update(
- causalityToken,
- (schemas, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
+ schemasVv.update(
+ causalityToken,
+ (schemas, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
+ Map<String, IgniteSchema> res = new HashMap<>(schemas);
- res.remove(schemaName);
+ res.remove(schemaName);
- return completedFuture(res);
- }
+ return completedFuture(res);
+ }
);
-
- rebuild(causalityToken, schemasMapFut);
}
/**
@@ -195,7 +197,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
TableImpl table,
long causalityToken
) {
- CompletableFuture<Map<String, IgniteSchema>> schemasMapFut =
schemasVv.update(
+ schemasVv.update(
causalityToken,
(schemas, e) -> {
if (e != null) {
@@ -211,25 +213,23 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
schema.addTable(removeSchema(schemaName, table.name()),
igniteTable);
return tablesVv
- .update(
- causalityToken,
- (tables, ex) -> {
- if (ex != null) {
- return failedFuture(ex);
- }
+ .update(
+ causalityToken,
+ (tables, ex) -> {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
- Map<UUID, IgniteTable> resTbls = new
HashMap<>(tables);
+ Map<UUID, IgniteTable> resTbls = new
HashMap<>(tables);
- resTbls.put(igniteTable.id(), igniteTable);
+ resTbls.put(igniteTable.id(),
igniteTable);
- return completedFuture(resTbls);
- }
- )
- .thenCompose(tables -> completedFuture(res));
+ return completedFuture(resTbls);
+ }
+ )
+ .thenCompose(tables -> completedFuture(res));
}
);
-
- rebuild(causalityToken, schemasMapFut);
}
/**
@@ -253,7 +253,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
String tableName,
long causalityToken
) {
- CompletableFuture<Map<String, IgniteSchema>> schemasMapFut =
schemasVv.update(causalityToken,
+ schemasVv.update(causalityToken,
(schemas, e) -> {
if (e != null) {
return failedFuture(e);
@@ -271,45 +271,41 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
schema.removeTable(calciteTableName);
return tablesVv
- .update(causalityToken,
- (tables, ex) -> {
- if (ex != null) {
- return failedFuture(ex);
- }
+ .update(causalityToken,
+ (tables, ex) -> {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
- Map<UUID, IgniteTable> resTbls = new
HashMap<>(tables);
+ Map<UUID, IgniteTable> resTbls =
new HashMap<>(tables);
- resTbls.remove(table.id());
+ resTbls.remove(table.id());
- return completedFuture(resTbls);
- }
- )
- .thenCompose(tables -> completedFuture(res));
+ return completedFuture(resTbls);
+ }
+ )
+ .thenCompose(tables -> completedFuture(res));
}
return completedFuture(res);
}
);
-
- rebuild(causalityToken, schemasMapFut);
}
- private void rebuild(long causalityToken, CompletableFuture<Map<String,
IgniteSchema>> schemasFut) {
- schemasFut.thenCompose(schemas -> {
- SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
-
- newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+ /**
+ * Rebuilds Calcite schemas.
+ *
+ * @param causalityToken Causality token.
+ * @param schemas Ignite schemas.
+ */
+ private void rebuild(long causalityToken, Map<String, IgniteSchema>
schemas) {
+ SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
- schemas.forEach(newCalciteSchema::add);
+ newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
- return calciteSchemaVv.update(causalityToken, (s, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
+ schemas.forEach(newCalciteSchema::add);
- return completedFuture(newCalciteSchema);
- });
- });
+ calciteSchemaVv.complete(causalityToken, newCalciteSchema);
}
private IgniteTableImpl convert(TableImpl table) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 79dd5df1a..58f8f20cb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -25,6 +25,7 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -105,6 +106,9 @@ public class SqlSchemaManagerTest {
assertThat(ex.getMessage(), containsString("Table not found"));
Mockito.verify(tableManager).table(eq(tblId));
+
+ Mockito.verify(tableManager, times(1)).onSqlSchemaReady(anyLong());
+
Mockito.verifyNoMoreInteractions(tableManager);
}
@@ -125,6 +129,9 @@ public class SqlSchemaManagerTest {
assertEquals(tableId, actTable.id());
Mockito.verify(tableManager).table(eq(tableId));
+
+ Mockito.verify(tableManager, times(1)).onSqlSchemaReady(anyLong());
+
Mockito.verifyNoMoreInteractions(tableManager);
}
@@ -147,6 +154,8 @@ public class SqlSchemaManagerTest {
assertEquals(tableId, actTable.id());
+ Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
Mockito.verifyNoMoreInteractions(tableManager);
}
@@ -169,6 +178,8 @@ public class SqlSchemaManagerTest {
assertEquals(tableId, actTable.id());
+ Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
Mockito.verifyNoMoreInteractions(tableManager);
}
@@ -197,6 +208,8 @@ public class SqlSchemaManagerTest {
assertThat(ex.getMessage(), containsString("Table version not found"));
Mockito.verify(tableManager, times(2)).table(eq(tableId));
+ Mockito.verify(tableManager, times(2)).onSqlSchemaReady(anyLong());
+
Mockito.verifyNoMoreInteractions(tableManager);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 61179919b..b7d57e846 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -37,7 +37,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -192,12 +191,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
clusterNodeResolver = topologyService::getByAddress;
tablesByIdVv = new VersionedValue<>(null, HashMap::new);
-
- this.schemaManager.listen(SchemaEvent.COMPLETE, (parameters, e) -> {
- tablesByIdVv.complete(parameters.causalityToken());
-
- return false;
- });
}
/** {@inheritDoc} */
@@ -228,19 +221,30 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() {
/** {@inheritDoc} */
- @Override public boolean notify(@NotNull SchemaEventParameters
parameters, @Nullable Throwable exception) {
-
tablesByIdVv.get(parameters.causalityToken()).thenAccept(tablesById -> {
+ @Override
+ public boolean notify(@NotNull SchemaEventParameters parameters,
@Nullable Throwable exception) {
+ if (tablesByIdVv.latest().get(parameters.tableId()) != null) {
fireEvent(
TableEvent.ALTER,
- new
TableEventParameters(parameters.causalityToken(),
tablesById.get(parameters.tableId())), null
+ new
TableEventParameters(parameters.causalityToken(),
tablesByIdVv.latest().get(parameters.tableId())), null
);
- });
+ }
return false;
}
});
}
+ /**
+ * Completes all table futures.
+ * TODO: Get rid of it after IGNITE-17062.
+ *
+ * @param causalityToken Causality token.
+ */
+ public void onSqlSchemaReady(long causalityToken) {
+ tablesByIdVv.complete(causalityToken);
+ }
+
/**
* Listener of table create configuration change.
*
@@ -447,8 +451,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
var table = new TableImpl(internalTable);
- CompletableFuture<Void> schemaFut =
schemaManager.schemaRegistry(causalityToken,
tblId).thenAccept(table::schemaView);
-
tablesByIdVv.update(causalityToken, (previous, e) -> {
if (e != null) {
return failedFuture(e);
@@ -461,14 +463,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return completedFuture(val);
});
- // TODO should be reworked in IGNITE-16763
- return tablesByIdVv.get(causalityToken)
- .thenCompose(v -> schemaFut)
- .thenRun(() -> {
- fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, table), null);
+ schemaManager.schemaRegistry(causalityToken, tblId)
+ .thenAccept(table::schemaView)
+ .thenRun(() -> fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, table), null));
- completeApiCreateFuture(table);
- });
+ // TODO should be reworked in IGNITE-16763
+ return tablesByIdVv.get(causalityToken).thenRun(() ->
completeApiCreateFuture(table));
}
/**
@@ -502,8 +502,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
raftMgr.stopRaftGroup(raftGroupName(tblId, p));
}
- AtomicReference<TableImpl> tableHolder = new AtomicReference<>();
-
tablesByIdVv.update(causalityToken, (previousVal, e) -> {
if (e != null) {
return failedFuture(e);
@@ -511,16 +509,15 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
var map = new HashMap<>(previousVal);
- TableImpl table = map.remove(tblId);
-
- tableHolder.set(table);
+ map.remove(tblId);
return completedFuture(map);
});
- TableImpl table = tableHolder.get();
+ TableImpl table = tablesByIdVv.latest().get(tblId);
- assert table != null : "There is no table with the name specified
[name=" + name + ']';
+ assert table != null : IgniteStringFormatter.format("There is no
table with the name specified [name={}, id={}]",
+ name, tblId);
table.internalTable().storage().destroy();
@@ -1046,7 +1043,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
if (e == null) {
- getTblFut.complete(parameters.table());
+ tablesByIdVv.get(parameters.causalityToken()).thenRun(()
-> getTblFut.complete(parameters.table()));
} else {
getTblFut.completeExceptionally(e);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index c60f0ef47..2d24d0511 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -43,7 +43,10 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.NamedListView;
@@ -79,6 +82,7 @@ import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMe
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -231,20 +235,7 @@ public class TableManagerTest extends IgniteAbstractTest {
when(rm.updateRaftGroup(any(), any(), any(), any())).thenAnswer(mock ->
CompletableFuture.completedFuture(mock(RaftGroupService.class)));
- TableManager tableManager = new TableManager(
- revisionUpdater,
- tblsCfg,
- rm,
- bm,
- ts,
- tm,
- dsm = createDataStorageManager(configRegistry, workDir,
pageMemoryEngineConfig),
- sm = new SchemaManager(revisionUpdater, tblsCfg)
- );
-
- sm.start();
-
- tableManager.start();
+ TableManager tableManager = createTableManager(tblManagerFut, false);
tblManagerFut.complete(tableManager);
@@ -339,7 +330,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
@Test
public void testApiTableManagerOnStop() {
- createTableManager(tblManagerFut);
+ createTableManager(tblManagerFut, false);
TableManager tableManager = tblManagerFut.join();
@@ -390,7 +381,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
@Test
public void testInternalApiTableManagerOnStop() {
- createTableManager(tblManagerFut);
+ createTableManager(tblManagerFut, false);
TableManager tableManager = tblManagerFut.join();
@@ -443,7 +434,7 @@ public class TableManagerTest extends IgniteAbstractTest {
CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(()
-> {
try {
return mockManagersAndCreateTableWithDelay(scmTbl,
tblManagerFut, phaser);
- } catch (NodeStoppingException e) {
+ } catch (Exception e) {
fail(e.getMessage());
}
@@ -505,12 +496,12 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tableDefinition Configuration schema for a table.
* @param tblManagerFut Future for table manager.
* @return Table.
- * @throws NodeStoppingException If something went wrong.
+ * @throws Exception If something went wrong.
*/
private TableImpl mockManagersAndCreateTable(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut
- ) throws NodeStoppingException {
+ ) throws Exception {
return mockManagersAndCreateTableWithDelay(tableDefinition,
tblManagerFut, null);
}
@@ -521,13 +512,13 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tblManagerFut Future for table manager.
* @param phaser Phaser for the wait.
* @return Table manager.
- * @throws NodeStoppingException If something went wrong.
+ * @throws Exception If something went wrong.
*/
private TableImpl mockManagersAndCreateTableWithDelay(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
- ) throws NodeStoppingException {
+ ) throws Exception {
when(rm.updateRaftGroup(any(), any(), any(), any())).thenAnswer(mock
-> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
@@ -558,7 +549,7 @@ public class TableManagerTest extends IgniteAbstractTest {
.thenReturn(assignment);
}
- TableManager tableManager = createTableManager(tblManagerFut);
+ TableManager tableManager = createTableManager(tblManagerFut, true);
final int tablesBeforeCreation = tableManager.tables().size();
@@ -580,12 +571,35 @@ public class TableManagerTest extends IgniteAbstractTest {
return CompletableFuture.completedFuture(null);
});
- TableImpl tbl2 = (TableImpl)
tableManager.createTable(tableDefinition.canonicalName(),
+ CountDownLatch createTblLatch = new CountDownLatch(1);
+
+ AtomicLong token = new AtomicLong();
+
+ tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
+
+ createTblLatch.countDown();
+
+ token.set(parameters.causalityToken());
+
+ return true;
+ });
+
+ CompletableFuture<Table> tbl2Fut =
tableManager.createTableAsync(tableDefinition.canonicalName(),
tblCh -> SchemaConfigurationConverter.convert(tableDefinition,
tblCh)
.changeReplicas(REPLICAS)
.changePartitions(PARTITIONS)
);
+ assertFalse(tbl2Fut.isDone());
+
+ assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
+
+ assertFalse(tbl2Fut.isDone());
+
+ tableManager.onSqlSchemaReady(token.get());
+
+ TableImpl tbl2 = (TableImpl) tbl2Fut.get();
+
assertNotNull(tbl2);
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
@@ -596,10 +610,12 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Creates Table manager.
*
- * @param tblManagerFut Future to wrap Table manager.
+ * @param tblManagerFut Future to wrap Table manager.
+ * @param waitingSqlSchema If the flag is true, a table will wait of
{@link TableManager#onSqlSchemaReady(long)} invocation before
+ * create otherwise, the waiting will not be.
* @return Table manager.
*/
- private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut) {
+ private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut, boolean waitingSqlSchema) {
TableManager tableManager = new TableManager(
revisionUpdater,
tblsCfg,
@@ -613,6 +629,15 @@ public class TableManagerTest extends IgniteAbstractTest {
sm.start();
+ //TODO: Get rid of it after IGNITE-17062.
+ if (!waitingSqlSchema) {
+ tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
+ tableManager.onSqlSchemaReady(parameters.causalityToken());
+
+ return false;
+ });
+ }
+
tableManager.start();
tblManagerFut.complete(tableManager);