This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2d597d6738 IGNITE-20635 Cleanup code wrt IGNITE-18733 mentions (#2686)
2d597d6738 is described below
commit 2d597d6738781c7e3a08e2a3231c49ccf5cc1682
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 13 16:32:30 2023 +0400
IGNITE-20635 Cleanup code wrt IGNITE-18733 mentions (#2686)
---
.../Table/SchemaSynchronizationTest.cs | 40 --------------
.../runner/app/ItIgniteNodeRestartTest.java | 62 ++++++++++++++--------
.../sql/engine/ClusterPerClassIntegrationTest.java | 1 -
.../internal/sql/engine/ItBuildIndexTest.java | 6 +--
4 files changed, 41 insertions(+), 68 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index bdcc689cb9..c7abd1a87d 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -73,7 +73,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// Modify table, insert data - client will use old schema, receive
error, retry with new schema.
// The process is transparent for the user: updated schema is in
effect immediately.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} DROP
COLUMN NAME");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
var rec2 = new IgniteTuple
{
@@ -132,7 +131,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// then force reload schema and retry.
// The process is transparent for the user: updated schema is in
effect immediately.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name2'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
var rec2 = new IgniteTuple
{
@@ -181,7 +179,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// Modify table, insert data - client will use old schema, receive
error, retry with new schema.
// The process is transparent for the user: updated schema is in
effect immediately.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
switch (testMode)
{
@@ -228,7 +225,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
await view.InsertAsync(null, rec);
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
var pocoView = table.GetRecordView<Poco>();
var poco = new Poco(1, string.Empty);
@@ -279,7 +275,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
await view.InsertAsync(null, rec);
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
var pocoView = table.GetRecordView<Poco>();
@@ -328,7 +323,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
await view.InsertAsync(null, rec);
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
var pocoView = table.GetKeyValueView<int, string>();
var res = await pocoView.GetAsync(null, 1);
@@ -373,7 +367,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// Update schema.
// New schema has a new column with a default value, so it is not
required to provide it in the streamed data.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName}
ADD COLUMN VAL varchar DEFAULT 'FOO'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
for (int i = 10; i < 20; i++)
{
@@ -395,7 +388,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// Update schema.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN VAL varchar DEFAULT 'FOO'");
- await WaitForNewSchemaOnAllNodes(TestTableName, 2);
// Stream data with new schema. Client does not yet know about the new
schema,
// but unmapped column exception will trigger schema reload.
@@ -410,37 +402,5 @@ public class SchemaSynchronizationTest : IgniteTestsBase
Assert.AreEqual("BAR", res2.Value["VAL"]);
}
- private async Task WaitForNewSchemaOnAllNodes(string tableName, int
schemaVer, int timeoutMs = 5000)
- {
- // TODO IGNITE-18733, IGNITE-18449: remove this workaround when issues
are fixed.
- // Currently new schema version is not immediately available on all
nodes.
- // Use separate client to check schema sync without affecting the
system under test.
- var configs = Client.Configuration.Endpoints.Select(e => new
IgniteClientConfiguration(e)).ToList();
-
- foreach (var cfg in configs)
- {
- using var client = await IgniteClient.StartAsync(cfg);
- var table = await client.Tables.GetTableAsync(tableName);
- var tableImpl = (Table)table!;
- var sw = Stopwatch.StartNew();
-
- while (true)
- {
- var schema = await
tableImpl.GetSchemaAsync(Apache.Ignite.Internal.Table.Table.SchemaVersionForceLatest);
- if (schema.Version >= schemaVer)
- {
- break;
- }
-
- if (sw.Elapsed > TimeSpan.FromMilliseconds(timeoutMs))
- {
- Assert.Fail($"Schema version {schema.Version} is not
available on node {cfg.Endpoints[0]} after {timeoutMs}ms");
- }
-
- await Task.Delay(50);
- }
- }
- }
-
private record Poco(int Id, string Name);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 93df66ac85..3344d97704 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.runner.app;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
@@ -59,6 +60,9 @@ import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import
org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -637,11 +641,11 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
int intRes;
try (Session session1 = ignite1.sql().createSession(); Session
session2 = ignite2.sql().createSession()) {
- session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME +
"(id)");
+ createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
- waitForIndex(List.of(ignite1, ignite2), "idx1");
+ session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME +
"(id)");
- createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
+ waitForIndexToBecomeAvailable(List.of(ignite1, ignite2), "idx1");
ResultSet<SqlRow> plan = session1.execute(null, "EXPLAIN PLAN FOR
" + sql);
@@ -743,7 +747,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Restarts the node which stores some data.
*/
@Test
- public void nodeWithDataTest() throws InterruptedException {
+ public void nodeWithDataTest() {
IgniteImpl ignite = startNode(0);
createTableWithData(List.of(ignite), TABLE_NAME, 1);
@@ -759,7 +763,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Restarts the node which stores some data.
*/
@ParameterizedTest
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
@ValueSource(booleans = {true, false})
public void metastorageRecoveryTest(boolean useSnapshot) throws
InterruptedException {
List<IgniteImpl> nodes = startNodes(2);
@@ -881,7 +884,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in the same order when they started at first.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
public void testTwoNodesRestartDirect() throws InterruptedException {
twoNodesRestart(true);
}
@@ -890,7 +892,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in reverse order when they started at first.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
public void testTwoNodesRestartReverse() throws InterruptedException {
twoNodesRestart(false);
}
@@ -900,7 +901,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*
* @param directOrder When the parameter is true, nodes restart in direct
order, otherwise they restart in reverse order.
*/
- private void twoNodesRestart(boolean directOrder) throws
InterruptedException {
+ private void twoNodesRestart(boolean directOrder) {
List<IgniteImpl> nodes = startNodes(2);
createTableWithData(nodes, TABLE_NAME, 2);
@@ -958,7 +959,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20137")
- public void testOneNodeRestartWithGap() throws InterruptedException {
+ public void testOneNodeRestartWithGap() {
IgniteImpl ignite = startNode(0);
startNode(1);
@@ -1011,7 +1012,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Checks that a cluster is able to restart when some changes were made in
configuration.
*/
@Test
- public void testRestartDiffConfig() throws InterruptedException {
+ public void testRestartDiffConfig() {
List<IgniteImpl> ignites = startNodes(2);
createTableWithData(ignites, TABLE_NAME, 2);
@@ -1039,7 +1040,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
- public void testCfgGapWithoutData() throws InterruptedException {
+ public void testCfgGapWithoutData() {
List<IgniteImpl> nodes = startNodes(3);
createTableWithData(nodes, TABLE_NAME, nodes.size());
@@ -1120,8 +1121,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
- public void testCfgGap() throws InterruptedException {
+ public void testCfgGap() {
List<IgniteImpl> nodes = startNodes(4);
createTableWithData(nodes, "t1", nodes.size());
@@ -1236,15 +1236,31 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
}
}
- private void waitForIndex(Collection<IgniteImpl> nodes, String indexName)
throws InterruptedException {
- // FIXME: Wait for the index to be created on all nodes,
- // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to
the PK index.
- assertTrue(waitForCondition(
- () -> nodes.stream()
- .map(nodeImpl ->
nodeImpl.catalogManager().index(indexName.toUpperCase(),
nodeImpl.clock().nowLong()))
- .allMatch(Objects::nonNull),
- TIMEOUT_MILLIS
- ));
+ private void waitForIndexToBecomeAvailable(Collection<IgniteImpl> nodes,
String indexName) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(nodes.size());
+
+ nodes.forEach(node ->
node.catalogManager().listen(CatalogEvent.INDEX_AVAILABLE, (event, ex) -> {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
+
+ MakeIndexAvailableEventParameters availableEvent =
(MakeIndexAvailableEventParameters) event;
+
+ CatalogIndexDescriptor index =
node.catalogManager().index(availableEvent.indexId(), event.catalogVersion());
+
+ assertNotNull(index, "Cannot find an index by ID=" +
availableEvent.indexId());
+
+ if (index.name().equalsIgnoreCase(indexName)) {
+ // That's our index.
+ latch.countDown();
+
+ return completedFuture(true);
+ }
+
+ return completedFuture(false);
+ }));
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 8ed419cb29..f91270958a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -450,7 +450,6 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
protected static Map<Integer, List<Ignite>> waitForIndexBuild(String
tableName, String indexName) throws Exception {
Map<Integer, List<Ignite>> partitionIdToNodes = new HashMap<>();
- // TODO: IGNITE-18733 We are waiting for the synchronization of schemes
for (Ignite clusterNode : CLUSTER_NODES) {
TableImpl tableImpl = getTableImpl(clusterNode, tableName);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
index 2425e508af..bb0da64ae2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -188,13 +188,11 @@ public class ItBuildIndexTest extends
ClusterPerClassIntegrationTest {
}
/**
- * Waits for all nodes in the cluster to have the given index in the
configuration.
+ * Waits for all nodes in the cluster to have the given index in the
Catalog.
*
- * @param indexName An index.
+ * @param indexName Name of an index to wait for.
*/
private static void waitForIndex(String indexName) throws
InterruptedException {
- // FIXME: Wait for the index to be created on all nodes,
- // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to
the index.
assertFalse(nullOrEmpty(CLUSTER_NODES));
assertTrue(waitForCondition(
() -> CLUSTER_NODES.stream().map(node ->
getIndexDescriptor(node, indexName)).allMatch(Objects::nonNull),