This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ce0b24740 IGNITE-20996 Fixed and enabled 
ItIgniteNodeRestartTest#testCfgGap and 
ItRebalanceTest#assignmentsChangingOnNodeLeaveNodeJoin (#4121)
1ce0b24740 is described below

commit 1ce0b247401c5e97e39a9130e4649d0448070701
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Aug 5 10:11:14 2024 +0300

    IGNITE-20996 Fixed and enabled ItIgniteNodeRestartTest#testCfgGap and 
ItRebalanceTest#assignmentsChangingOnNodeLeaveNodeJoin (#4121)
---
 .../internal/testframework/IgniteTestUtils.java    | 26 ++++++++
 .../runner/app/ItIgniteNodeRestartTest.java        | 78 ++++++++++++++++++++--
 .../ignite/internal/rebalance/ItRebalanceTest.java | 28 ++++++--
 .../internal/table/ItTxResourcesVacuumTest.java    | 24 ++-----
 .../internal/table/RecordBinaryViewImpl.java       | 13 ++++
 5 files changed, 140 insertions(+), 29 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 749f0b170a..ec232cbf4f 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -49,6 +49,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
@@ -947,6 +950,29 @@ public final class IgniteTestUtils {
         }
     }
 
+    /**
+     * Run the closure in the given executor, wait for the result and get it 
synchronously.
+     *
+     * @param executor Executor.
+     * @param closure Closure.
+     * @return Closure result.
+     */
+    public static <T> T runInExecutor(ExecutorService executor, Supplier<T> 
closure) {
+        Object[] arr = new Object[1];
+
+        Future f = executor.submit(() -> {
+            arr[0] = closure.get();
+        });
+
+        try {
+            f.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        return (T) arr[0];
+    }
+
     /**
      * Predicate matcher.
      *
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 0637c23cc6..159702e537 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -35,6 +35,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThr
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertionsAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runInExecutor;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -64,6 +65,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -119,6 +122,7 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -176,8 +180,10 @@ import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
+import org.apache.ignite.internal.table.RecordBinaryViewImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
@@ -188,7 +194,9 @@ import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionC
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -200,9 +208,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
 import 
org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
+import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -213,6 +223,7 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
 import org.intellij.lang.annotations.Language;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
@@ -280,6 +291,10 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      */
     private final Map<Integer, Supplier<CompletableFuture<Set<String>>>> 
dataNodesMockByNode = new ConcurrentHashMap<>();
 
+    private final ExecutorService storageExecutor = 
Executors.newSingleThreadExecutor(
+            IgniteThreadFactory.create("test", "storage-test-pool-iinrt", log, 
ThreadOperation.STORAGE_READ)
+    );
+
     @BeforeEach
     public void beforeTest() {
         metaStorageInvokeInterceptorByNode.clear();
@@ -1359,11 +1374,10 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * The test for node restart when there is a gap between the node local 
configuration and distributed configuration.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20996";)
     public void testCfgGap() {
         List<IgniteImpl> nodes = startNodes(4);
 
-        createTableWithData(nodes, "t1", nodes.size());
+        createTableWithData(nodes, "t1", nodes.size(), 1);
 
         log.info("Stopping the node.");
 
@@ -1373,7 +1387,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         checkTableWithData(nodes.get(0), "t1");
 
-        createTableWithData(nodes, "t2", nodes.size());
+        createTableWithData(nodes, "t2", nodes.size(), 1);
 
         log.info("Starting the node.");
 
@@ -1382,8 +1396,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         checkTableWithData(nodes.get(0), "t1");
         checkTableWithData(nodes.get(0), "t2");
 
-        checkTableWithData(newNode, "t1");
-        checkTableWithData(newNode, "t2");
+        checkTableWithDataOnSpecificNode(newNode, "t1", 100, 0);
+        checkTableWithDataOnSpecificNode(newNode, "t2", 100, 0);
     }
 
     /**
@@ -1877,7 +1891,59 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
     }
 
     /**
-     * Checks the table exists and validates all data in it.
+     * Checks the table exists and validates all data in the storage of 
specific node.
+     *
+     * @param ignite Ignite.
+     * @param name Table name.
+     * @param recordsCount Total count of records to check.
+     * @param partNum Partition to check.
+     */
+    private void checkTableWithDataOnSpecificNode(Ignite ignite, String name, 
int recordsCount, int partNum) {
+        TableImpl table = unwrapTableImpl(ignite.tables().table(name));
+        RecordBinaryViewImpl view = Wrappers.unwrap(table.recordView(), 
RecordBinaryViewImpl.class);
+
+        AtomicInteger foundRecords = new AtomicInteger();
+
+        Supplier<Boolean> allDataPresent = () -> {
+            foundRecords.set(0);
+
+            try (Cursor<ReadResult> cursor = 
table.internalTable().storage().getMvPartition(partNum).scan(HybridTimestamp.MAX_VALUE))
 {
+                while (cursor.hasNext()) {
+                    ReadResult rr = cursor.next();
+
+                    if (!rr.isWriteIntent()) {
+                        Tuple tuple = view.binaryRowToTuple(null, 
rr.binaryRow()).join();
+                        if (tuple == null) {
+                            return false;
+                        } else {
+                            int id = tuple.intValue("id");
+                            assertEquals(VALUE_PRODUCER.apply(id), 
tuple.stringValue("name"));
+                            foundRecords.incrementAndGet();
+                        }
+                    }
+                }
+            }
+
+            return foundRecords.get() == recordsCount;
+        };
+
+        try {
+            Awaitility.with()
+                    .await()
+                    .pollInterval(100, TimeUnit.MILLISECONDS)
+                    .pollDelay(0, TimeUnit.MILLISECONDS)
+                    .atMost(3, TimeUnit.SECONDS)
+                    .until(() -> runInExecutor(storageExecutor, 
allDataPresent));
+        } catch (AssertionError | ConditionTimeoutException e) {
+            log.error("Found records in partitions: " + foundRecords.get());
+
+            throw e;
+        }
+    }
+
+    /**
+     * Checks the table exists and validates all data in it. Important: it 
checks availability of the data, not the presence of
+     * the data in a storage of specific node.
      *
      * @param ignite Ignite.
      * @param name Table name.
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 538a95f0f6..ecf77eb130 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.rebalance;
 
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
@@ -39,19 +40,21 @@ import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -83,16 +86,19 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest 
{
      *
      * @throws Exception If failed.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20996";)
     @Test
     void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
-        cluster.startAndInit(4);
+        cluster.startAndInit(4, builder -> builder.clusterConfiguration("{\n"
+                + "    \"replication\": {\n"
+                + "        \"rpcTimeout\": 8000 \n"
+                + "    }"
+                + "}"));
 
         createZone("TEST_ZONE", 1, 3);
         // Creates table with 1 partition and 3 replicas.
         createTestTable("TEST_TABLE", "TEST_ZONE");
 
-        TableViewInternal table = (TableViewInternal) 
cluster.node(0).tables().table("TEST_TABLE");
+        TableViewInternal table = 
unwrapTableViewInternal(cluster.node(0).tables().table("TEST_TABLE"));
 
         waitForStableAssignmentsInMetastore(Set.of(
                 nodeName(0),
@@ -101,7 +107,7 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest 
{
         ), table.tableId());
 
         BinaryRowEx row = marshalTuple(table, Tuple.create().set("id", 
1).set("val", "value1"));
-        BinaryRowEx key = marshalTuple(table, Tuple.create().set("id", 1));
+        BinaryRowEx key = marshalKey(table, 1, Integer.class);
 
         assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(0).node()), willBe(nullValue()));
         assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(1).node()), willBe(nullValue()));
@@ -155,6 +161,18 @@ public class ItRebalanceTest extends 
BaseIgniteAbstractTest {
         return marshaller.marshal(tuple);
     }
 
+    private static <K> Row marshalKey(TableViewInternal table, K key, Class<K> 
keyClass) {
+        SchemaRegistry schemaReg = table.schemaView();
+        var marshaller = new KvMarshallerImpl<>(
+                schemaReg.lastKnownSchema(),
+                new ReflectionMarshallersProvider(),
+                Mapper.of(keyClass),
+                Mapper.of(Void.class)
+        );
+
+        return marshaller.marshal(key);
+    }
+
     private void waitForStableAssignmentsInMetastore(Set<String> 
expectedNodes, int table) throws InterruptedException {
         Set<String>[] lastAssignmentsHolderForLog = new Set[1];
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index f4b6ebc77e..5ba0b25ff9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runInExecutor;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
@@ -44,10 +45,8 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -109,7 +108,9 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
             + "  compute.threadPoolSize: 1\n"
             + "}";
 
-    private ExecutorService txStateStorageExecutor = 
Executors.newSingleThreadExecutor();
+    private final ExecutorService txStateStorageExecutor = 
Executors.newSingleThreadExecutor(
+            IgniteThreadFactory.create("test", 
"tx-state-storage-test-pool-itrvt", log, ThreadOperation.STORAGE_READ)
+    );
 
     @BeforeEach
     @Override
@@ -124,9 +125,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
             executeUpdate(zoneSql, session);
             executeUpdate(sql, session);
         });
-
-        txStateStorageExecutor = 
Executors.newSingleThreadExecutor(IgniteThreadFactory.create("test", 
"tx-state-storage-test-pool", log,
-                ThreadOperation.STORAGE_READ));
     }
 
     @Override
@@ -1045,23 +1043,13 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
     @Nullable
     private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, 
String tableName, int partId) {
-        TransactionMeta[] meta = new TransactionMeta[1];
-
-        Future f = txStateStorageExecutor.submit(() -> {
+        return runInExecutor(txStateStorageExecutor, () -> {
             TxStateStorage txStateStorage = table(node, 
tableName).internalTable().txStateStorage().getTxStateStorage(partId);
 
             assertNotNull(txStateStorage);
 
-            meta[0] = txStateStorage.get(txId);
+            return txStateStorage.get(txId);
         });
-
-        try {
-            f.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
-
-        return meta[0];
     }
 
     private IgniteImpl findNode(Predicate<IgniteImpl> filter) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 8c1102484e..4295dba0e3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -455,6 +455,19 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
         });
     }
 
+    /**
+     * Unmarshal a row to tuple. Test-only public method.
+     *
+     * @param tx Transaction, if present.
+     * @param row Binary row.
+     * @return A future, with tuple as a result.
+     */
+    @TestOnly
+    @VisibleForTesting
+    public CompletableFuture<Tuple> binaryRowToTuple(@Nullable Transaction tx, 
BinaryRow row) {
+        return doOperation(tx, schemaVersion -> completedFuture(wrap(row, 
schemaVersion)));
+    }
+
     /**
      * Returns table row tuple.
      *

Reply via email to