This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1ce0b24740 IGNITE-20996 Fixed and enabled
ItIgniteNodeRestartTest#testCfgGap and
ItRebalanceTest#assignmentsChangingOnNodeLeaveNodeJoin (#4121)
1ce0b24740 is described below
commit 1ce0b247401c5e97e39a9130e4649d0448070701
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Aug 5 10:11:14 2024 +0300
IGNITE-20996 Fixed and enabled ItIgniteNodeRestartTest#testCfgGap and
ItRebalanceTest#assignmentsChangingOnNodeLeaveNodeJoin (#4121)
---
.../internal/testframework/IgniteTestUtils.java | 26 ++++++++
.../runner/app/ItIgniteNodeRestartTest.java | 78 ++++++++++++++++++++--
.../ignite/internal/rebalance/ItRebalanceTest.java | 28 ++++++--
.../internal/table/ItTxResourcesVacuumTest.java | 24 ++-----
.../internal/table/RecordBinaryViewImpl.java | 13 ++++
5 files changed, 140 insertions(+), 29 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 749f0b170a..ec232cbf4f 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -49,6 +49,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
@@ -947,6 +950,29 @@ public final class IgniteTestUtils {
}
}
+ /**
+ * Run the closure in the given executor, wait for the result and get it
synchronously.
+ *
+ * @param executor Executor.
+ * @param closure Closure.
+ * @return Closure result.
+ */
+ public static <T> T runInExecutor(ExecutorService executor, Supplier<T>
closure) {
+ Object[] arr = new Object[1];
+
+ Future f = executor.submit(() -> {
+ arr[0] = closure.get();
+ });
+
+ try {
+ f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ return (T) arr[0];
+ }
+
/**
* Predicate matcher.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 0637c23cc6..159702e537 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -35,6 +35,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThr
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertionsAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runInExecutor;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -64,6 +65,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -119,6 +122,7 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -176,8 +180,10 @@ import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
+import org.apache.ignite.internal.table.RecordBinaryViewImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -188,7 +194,9 @@ import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionC
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -200,9 +208,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
import
org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
+import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -213,6 +223,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
@@ -280,6 +291,10 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*/
private final Map<Integer, Supplier<CompletableFuture<Set<String>>>>
dataNodesMockByNode = new ConcurrentHashMap<>();
+ private final ExecutorService storageExecutor =
Executors.newSingleThreadExecutor(
+ IgniteThreadFactory.create("test", "storage-test-pool-iinrt", log,
ThreadOperation.STORAGE_READ)
+ );
+
@BeforeEach
public void beforeTest() {
metaStorageInvokeInterceptorByNode.clear();
@@ -1359,11 +1374,10 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20996")
public void testCfgGap() {
List<IgniteImpl> nodes = startNodes(4);
- createTableWithData(nodes, "t1", nodes.size());
+ createTableWithData(nodes, "t1", nodes.size(), 1);
log.info("Stopping the node.");
@@ -1373,7 +1387,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
checkTableWithData(nodes.get(0), "t1");
- createTableWithData(nodes, "t2", nodes.size());
+ createTableWithData(nodes, "t2", nodes.size(), 1);
log.info("Starting the node.");
@@ -1382,8 +1396,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
checkTableWithData(nodes.get(0), "t1");
checkTableWithData(nodes.get(0), "t2");
- checkTableWithData(newNode, "t1");
- checkTableWithData(newNode, "t2");
+ checkTableWithDataOnSpecificNode(newNode, "t1", 100, 0);
+ checkTableWithDataOnSpecificNode(newNode, "t2", 100, 0);
}
/**
@@ -1877,7 +1891,59 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
}
/**
- * Checks the table exists and validates all data in it.
+ * Checks the table exists and validates all data in the storage of
specific node.
+ *
+ * @param ignite Ignite.
+ * @param name Table name.
+ * @param recordsCount Total count of records to check.
+ * @param partNum Partition to check.
+ */
+ private void checkTableWithDataOnSpecificNode(Ignite ignite, String name,
int recordsCount, int partNum) {
+ TableImpl table = unwrapTableImpl(ignite.tables().table(name));
+ RecordBinaryViewImpl view = Wrappers.unwrap(table.recordView(),
RecordBinaryViewImpl.class);
+
+ AtomicInteger foundRecords = new AtomicInteger();
+
+ Supplier<Boolean> allDataPresent = () -> {
+ foundRecords.set(0);
+
+ try (Cursor<ReadResult> cursor =
table.internalTable().storage().getMvPartition(partNum).scan(HybridTimestamp.MAX_VALUE))
{
+ while (cursor.hasNext()) {
+ ReadResult rr = cursor.next();
+
+ if (!rr.isWriteIntent()) {
+ Tuple tuple = view.binaryRowToTuple(null,
rr.binaryRow()).join();
+ if (tuple == null) {
+ return false;
+ } else {
+ int id = tuple.intValue("id");
+ assertEquals(VALUE_PRODUCER.apply(id),
tuple.stringValue("name"));
+ foundRecords.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ return foundRecords.get() == recordsCount;
+ };
+
+ try {
+ Awaitility.with()
+ .await()
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .pollDelay(0, TimeUnit.MILLISECONDS)
+ .atMost(3, TimeUnit.SECONDS)
+ .until(() -> runInExecutor(storageExecutor,
allDataPresent));
+ } catch (AssertionError | ConditionTimeoutException e) {
+ log.error("Found records in partitions: " + foundRecords.get());
+
+ throw e;
+ }
+ }
+
+ /**
+ * Checks the table exists and validates all data in it. Important: it
checks availability of the data, not the presence of
+ * the data in a storage of specific node.
*
* @param ignite Ignite.
* @param name Table name.
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 538a95f0f6..ecf77eb130 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.rebalance;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
@@ -39,19 +40,21 @@ import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -83,16 +86,19 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest
{
*
* @throws Exception If failed.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20996")
@Test
void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
- cluster.startAndInit(4);
+ cluster.startAndInit(4, builder -> builder.clusterConfiguration("{\n"
+ + " \"replication\": {\n"
+ + " \"rpcTimeout\": 8000 \n"
+ + " }"
+ + "}"));
createZone("TEST_ZONE", 1, 3);
// Creates table with 1 partition and 3 replicas.
createTestTable("TEST_TABLE", "TEST_ZONE");
- TableViewInternal table = (TableViewInternal)
cluster.node(0).tables().table("TEST_TABLE");
+ TableViewInternal table =
unwrapTableViewInternal(cluster.node(0).tables().table("TEST_TABLE"));
waitForStableAssignmentsInMetastore(Set.of(
nodeName(0),
@@ -101,7 +107,7 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest
{
), table.tableId());
BinaryRowEx row = marshalTuple(table, Tuple.create().set("id",
1).set("val", "value1"));
- BinaryRowEx key = marshalTuple(table, Tuple.create().set("id", 1));
+ BinaryRowEx key = marshalKey(table, 1, Integer.class);
assertThat(table.internalTable().get(key, clock.now(),
cluster.node(0).node()), willBe(nullValue()));
assertThat(table.internalTable().get(key, clock.now(),
cluster.node(1).node()), willBe(nullValue()));
@@ -155,6 +161,18 @@ public class ItRebalanceTest extends
BaseIgniteAbstractTest {
return marshaller.marshal(tuple);
}
+ private static <K> Row marshalKey(TableViewInternal table, K key, Class<K>
keyClass) {
+ SchemaRegistry schemaReg = table.schemaView();
+ var marshaller = new KvMarshallerImpl<>(
+ schemaReg.lastKnownSchema(),
+ new ReflectionMarshallersProvider(),
+ Mapper.of(keyClass),
+ Mapper.of(Void.class)
+ );
+
+ return marshaller.marshal(key);
+ }
+
private void waitForStableAssignmentsInMetastore(Set<String>
expectedNodes, int table) throws InterruptedException {
Set<String>[] lastAssignmentsHolderForLog = new Set[1];
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index f4b6ebc77e..5ba0b25ff9 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.SessionUtils.executeUpdate;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runInExecutor;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
@@ -44,10 +45,8 @@ import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -109,7 +108,9 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
+ " compute.threadPoolSize: 1\n"
+ "}";
- private ExecutorService txStateStorageExecutor =
Executors.newSingleThreadExecutor();
+ private final ExecutorService txStateStorageExecutor =
Executors.newSingleThreadExecutor(
+ IgniteThreadFactory.create("test",
"tx-state-storage-test-pool-itrvt", log, ThreadOperation.STORAGE_READ)
+ );
@BeforeEach
@Override
@@ -124,9 +125,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
-
- txStateStorageExecutor =
Executors.newSingleThreadExecutor(IgniteThreadFactory.create("test",
"tx-state-storage-test-pool", log,
- ThreadOperation.STORAGE_READ));
}
@Override
@@ -1045,23 +1043,13 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
@Nullable
private TransactionMeta persistentTxState(IgniteImpl node, UUID txId,
String tableName, int partId) {
- TransactionMeta[] meta = new TransactionMeta[1];
-
- Future f = txStateStorageExecutor.submit(() -> {
+ return runInExecutor(txStateStorageExecutor, () -> {
TxStateStorage txStateStorage = table(node,
tableName).internalTable().txStateStorage().getTxStateStorage(partId);
assertNotNull(txStateStorage);
- meta[0] = txStateStorage.get(txId);
+ return txStateStorage.get(txId);
});
-
- try {
- f.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
-
- return meta[0];
}
private IgniteImpl findNode(Predicate<IgniteImpl> filter) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 8c1102484e..4295dba0e3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -455,6 +455,19 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
});
}
+ /**
+ * Unmarshal a row to tuple. Test-only public method.
+ *
+ * @param tx Transaction, if present.
+ * @param row Binary row.
+ * @return A future, with tuple as a result.
+ */
+ @TestOnly
+ @VisibleForTesting
+ public CompletableFuture<Tuple> binaryRowToTuple(@Nullable Transaction tx,
BinaryRow row) {
+ return doOperation(tx, schemaVersion -> completedFuture(wrap(row,
schemaVersion)));
+ }
+
/**
* Returns table row tuple.
*