This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d0d4e349c9 IGNITE-22400 Fixed flaky IgniteClusterSnapshotStreamerTest 
(#11369)
5d0d4e349c9 is described below

commit 5d0d4e349c9c8f1fc8dbbcd53e54e7e19dc45923
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Jun 4 10:16:25 2024 +0300

    IGNITE-22400 Fixed flaky IgniteClusterSnapshotStreamerTest (#11369)
---
 .../SnapshotPartitionsQuickVerifyHandler.java      | 35 +++++++++++++---------
 .../IgniteClusterSnapshotStreamerTest.java         | 26 ++++++++++++----
 2 files changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
index e378dec9b35..cce4e5d3057 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -27,7 +27,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.management.cache.PartitionKeyV2;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -71,28 +70,36 @@ public class SnapshotPartitionsQuickVerifyHandler extends 
SnapshotPartitionsVeri
         String name,
         Collection<SnapshotHandlerResult<Map<PartitionKeyV2, 
PartitionHashRecordV2>>> results
     ) throws IgniteCheckedException {
+        boolean noData = false;
+
+        Set<Integer> wrnGrps = new HashSet<>();
+
+        Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+
         for (SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>> 
result : results) {
             if (result.error() != null)
                 throw new IgniteCheckedException(result.error());
-        }
 
-        if (results.stream().anyMatch(r -> r.data() == null))
-            return;
+            if (result.data() == null) {
+                noData = true;
 
-        Set<Integer> wrnGrps = new HashSet<>();
+                continue;
+            }
 
-        Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+            Map<PartitionKeyV2, PartitionHashRecordV2> partsData = 
result.data();
 
-        F.viewReadOnly(results, SnapshotHandlerResult::data).forEach(m -> 
m.forEach((part, val) -> {
-            PartitionHashRecordV2 other = total.putIfAbsent(part, val);
+            partsData.forEach((part, val) -> {
+                PartitionHashRecordV2 other = total.putIfAbsent(part, val);
 
-            if (other == null)
-                return;
+                if ((other != null && !wrnGrps.contains(part.groupId()))
+                    && ((!val.hasExpiringEntries() && 
!other.hasExpiringEntries() && val.size() != other.size())
+                        || !Objects.equals(val.updateCounter(), 
other.updateCounter())))
+                    wrnGrps.add(part.groupId());
+            });
+        }
 
-            if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && 
val.size() != other.size())
-                || !Objects.equals(val.updateCounter(), other.updateCounter()))
-                wrnGrps.add(part.groupId());
-        }));
+        if (noData)
+            return;
 
         if (!wrnGrps.isEmpty()) {
             throw new SnapshotWarningException("Cache partitions differ for 
cache groups " +
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index 4072689c819..2d0b0e65bb4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -63,6 +64,9 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
     /** */
     private static final String INMEM_DATA_REGION = "inMemDr";
 
+    /** */
+    private static final int SERVER_CNT = 3;
+
     /** */
     private IgniteEx client;
 
@@ -83,7 +87,7 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
 
         dfltCacheCfg.setBackups(2);
 
-        startGrids(3);
+        startGrids(SERVER_CNT);
 
         grid(0).cluster().state(ACTIVE);
 
@@ -95,6 +99,8 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
 
         client = startClientGrid(G.allGrids().size());
 
+        dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
         grid(0).createCache(dfltCacheCfg);
     }
 
@@ -408,6 +414,7 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
 
         stopLoading.set(true);
         loadFut.cancel();
+        waitForCondition(loadFut::isDone, getTestTimeout());
 
         if (allowOverwrite)
             createAndCheckSnapshot(snpHnd, true, null, null);
@@ -424,9 +431,10 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
      * @param allowOverwrite 'allowOverwrite' setting.
      * @param stop Stop load flag.
      */
-    private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean 
allowOverwrite, AtomicBoolean stop)
-        throws InterruptedException {
-        CountDownLatch preload = new CountDownLatch(10_000);
+    private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean 
allowOverwrite, AtomicBoolean stop) throws InterruptedException {
+        int preload = SERVER_CNT * dfltCacheCfg.getAffinity().partitions();
+
+        CountDownLatch proceed = new CountDownLatch(1);
 
         IgniteInternalFuture<?> res = runAsync(() -> {
             try (IgniteDataStreamer<Integer, Object> ds = 
ldr.dataStreamer(dfltCacheCfg.getName())) {
@@ -437,12 +445,18 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
                 while (!stop.get()) {
                     ds.addData(++idx, idx);
 
-                    preload.countDown();
+                    if (idx == preload) {
+                        ds.flush();
+
+                        proceed.countDown();
+                    }
                 }
+
+                proceed.countDown();
             }
         }, "load-thread");
 
-        preload.await();
+        proceed.await();
 
         return res;
     }

Reply via email to