This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5d0d4e349c9 IGNITE-22400 Fixed flaky IgniteClusterSnapshotStreamerTest
(#11369)
5d0d4e349c9 is described below
commit 5d0d4e349c9c8f1fc8dbbcd53e54e7e19dc45923
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Jun 4 10:16:25 2024 +0300
IGNITE-22400 Fixed flaky IgniteClusterSnapshotStreamerTest (#11369)
---
.../SnapshotPartitionsQuickVerifyHandler.java | 35 +++++++++++++---------
.../IgniteClusterSnapshotStreamerTest.java | 26 ++++++++++++----
2 files changed, 41 insertions(+), 20 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
index e378dec9b35..cce4e5d3057 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -27,7 +27,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -71,28 +70,36 @@ public class SnapshotPartitionsQuickVerifyHandler extends
SnapshotPartitionsVeri
String name,
Collection<SnapshotHandlerResult<Map<PartitionKeyV2,
PartitionHashRecordV2>>> results
) throws IgniteCheckedException {
+ boolean noData = false;
+
+ Set<Integer> wrnGrps = new HashSet<>();
+
+ Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+
for (SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>
result : results) {
if (result.error() != null)
throw new IgniteCheckedException(result.error());
- }
- if (results.stream().anyMatch(r -> r.data() == null))
- return;
+ if (result.data() == null) {
+ noData = true;
- Set<Integer> wrnGrps = new HashSet<>();
+ continue;
+ }
- Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+ Map<PartitionKeyV2, PartitionHashRecordV2> partsData =
result.data();
- F.viewReadOnly(results, SnapshotHandlerResult::data).forEach(m ->
m.forEach((part, val) -> {
- PartitionHashRecordV2 other = total.putIfAbsent(part, val);
+ partsData.forEach((part, val) -> {
+ PartitionHashRecordV2 other = total.putIfAbsent(part, val);
- if (other == null)
- return;
+ if ((other != null && !wrnGrps.contains(part.groupId()))
+ && ((!val.hasExpiringEntries() &&
!other.hasExpiringEntries() && val.size() != other.size())
+ || !Objects.equals(val.updateCounter(),
other.updateCounter())))
+ wrnGrps.add(part.groupId());
+ });
+ }
- if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() &&
val.size() != other.size())
- || !Objects.equals(val.updateCounter(), other.updateCounter()))
- wrnGrps.add(part.groupId());
- }));
+ if (noData)
+ return;
if (!wrnGrps.isEmpty()) {
throw new SnapshotWarningException("Cache partitions differ for
cache groups " +
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index 4072689c819..2d0b0e65bb4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -63,6 +64,9 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
/** */
private static final String INMEM_DATA_REGION = "inMemDr";
+ /** */
+ private static final int SERVER_CNT = 3;
+
/** */
private IgniteEx client;
@@ -83,7 +87,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
dfltCacheCfg.setBackups(2);
- startGrids(3);
+ startGrids(SERVER_CNT);
grid(0).cluster().state(ACTIVE);
@@ -95,6 +99,8 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
client = startClientGrid(G.allGrids().size());
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
grid(0).createCache(dfltCacheCfg);
}
@@ -408,6 +414,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
stopLoading.set(true);
loadFut.cancel();
+ waitForCondition(loadFut::isDone, getTestTimeout());
if (allowOverwrite)
createAndCheckSnapshot(snpHnd, true, null, null);
@@ -424,9 +431,10 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
* @param allowOverwrite 'allowOverwrite' setting.
* @param stop Stop load flag.
*/
- private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean
allowOverwrite, AtomicBoolean stop)
- throws InterruptedException {
- CountDownLatch preload = new CountDownLatch(10_000);
+ private IgniteInternalFuture<?> runLoad(Ignite ldr, boolean
allowOverwrite, AtomicBoolean stop) throws InterruptedException {
+ int preload = SERVER_CNT * dfltCacheCfg.getAffinity().partitions();
+
+ CountDownLatch proceed = new CountDownLatch(1);
IgniteInternalFuture<?> res = runAsync(() -> {
try (IgniteDataStreamer<Integer, Object> ds =
ldr.dataStreamer(dfltCacheCfg.getName())) {
@@ -437,12 +445,18 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
while (!stop.get()) {
ds.addData(++idx, idx);
- preload.countDown();
+ if (idx == preload) {
+ ds.flush();
+
+ proceed.countDown();
+ }
}
+
+ proceed.countDown();
}
}, "load-thread");
- preload.await();
+ proceed.await();
return res;
}