This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 687307b7db2 Ignite-17793 : Historical rebalance must use HWM instead
of LWM (#10396)
687307b7db2 is described below
commit 687307b7db263f881654b9aed8f396024ac8d2aa
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed Dec 7 16:43:40 2022 +0300
Ignite-17793 : Historical rebalance must use HWM instead of LWM (#10396)
---
.../cache/IgniteCacheOffheapManager.java | 5 +
.../cache/IgniteCacheOffheapManagerImpl.java | 5 +
.../processors/cache/PartitionUpdateCounter.java | 7 +
.../cache/PartitionUpdateCounterDebugWrapper.java | 5 +
.../cache/PartitionUpdateCounterErrorWrapper.java | 5 +
.../cache/PartitionUpdateCounterTrackingImpl.java | 4 +-
.../cache/PartitionUpdateCounterVolatileImpl.java | 5 +
.../dht/topology/GridDhtLocalPartition.java | 8 +
.../cache/persistence/GridCacheOffheapManager.java | 12 +
.../persistence/checkpoint/CheckpointWorkflow.java | 2 +-
.../HistoricalRebalanceCheckpointTest.java | 453 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
12 files changed, 511 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d2061645645..db8823128e8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -669,6 +669,11 @@ public interface IgniteCacheOffheapManager {
*/
long updateCounter();
+ /**
+ * @return Highest applied update counter.
+ */
+ long highestAppliedCounter();
+
/**
* @return Reserved counter (HWM).
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e6f78780e5f..a26e9c6df14 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1599,6 +1599,11 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
return pCntr.get();
}
+ /** {@inheritDoc} */
+ @Override public long highestAppliedCounter() {
+ return pCntr.highestAppliedCounter();
+ }
+
/** {@inheritDoc} */
@Override public long reservedCounter() {
return pCntr.reserved();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index f37bf4c26dd..2475770b6ca 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -80,6 +80,13 @@ public interface PartitionUpdateCounter extends
Iterable<long[]> {
*/
public long reserved();
+ /**
+ * Returns highest applied update counter.
+ *
+ * @return Highest applied counter.
+ */
+ public long highestAppliedCounter();
+
/**
* Sets update counter to absolute value. All missed updates will be
discarded.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index 87cc5803fc7..39aeb92b63a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -237,6 +237,11 @@ public class PartitionUpdateCounterDebugWrapper implements
PartitionUpdateCounte
return delegate.get();
}
+ /** {@inheritDoc} */
+ @Override public long highestAppliedCounter() {
+ return delegate.highestAppliedCounter();
+ }
+
/** {@inheritDoc} */
@Override public long reserved() {
return delegate.reserved();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
index e08317c0315..6e2201891cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
@@ -146,6 +146,11 @@ public class PartitionUpdateCounterErrorWrapper implements
PartitionUpdateCounte
return delegate.reserved();
}
+ /** {@inheritDoc} */
+ @Override public long highestAppliedCounter() {
+ return delegate.highestAppliedCounter();
+ }
+
/** {@inheritDoc} */
@Nullable @Override public byte[] getBytes() {
return delegate.getBytes();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index ed700945108..b778a1393e2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -119,8 +119,8 @@ public class PartitionUpdateCounterTrackingImpl implements
PartitionUpdateCounte
return lwm.get();
}
- /** */
- protected synchronized long highestAppliedCounter() {
+ /** {@inheritDoc} */
+ @Override public synchronized long highestAppliedCounter() {
return queue.isEmpty() ? lwm.get() :
queue.lastEntry().getValue().absolute();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index 054ecb6df8b..7a05918e360 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -148,6 +148,11 @@ public class PartitionUpdateCounterVolatileImpl implements
PartitionUpdateCounte
return get();
}
+ /** {@inheritDoc} */
+ @Override public long highestAppliedCounter() {
+ return get();
+ }
+
/** {@inheritDoc} */
@Override public boolean empty() {
return get() == 0;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index d6b6206aeb3..245f580dc4d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -911,6 +911,14 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
return store.updateCounter();
}
+ //TODO: https://issues.apache.org/jira/browse/IGNITE-18343: Refactor
partition counters API.
+ /**
+ * @return Highest applied update counter.
+ */
+ public long highestAppliedCounter() {
+ return store.highestAppliedCounter();
+ }
+
/**
* @return Current reserved counter (HWM).
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 64bc8928ec6..7622305cce4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2446,6 +2446,18 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
}
}
+ /** {@inheritDoc} */
+ @Override public long highestAppliedCounter() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null ? 0 :
delegate0.highestAppliedCounter();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public long reservedCounter() {
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
index 19a8c72a2e5..4fda960a3ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
@@ -399,7 +399,7 @@ public class CheckpointWorkflow {
state.addPartitionState(
part.id(),
part.dataStore().fullSize(),
- part.updateCounter(),
+ part.highestAppliedCounter(),
(byte)partState.ordinal()
);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceCheckpointTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceCheckpointTest.java
new file mode 100644
index 00000000000..34cc306e41d
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceCheckpointTest.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import com.google.common.base.Functions;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test partitions consistency after a historical rebalance with lost
transaction requests and responses.
+ */
+public class HistoricalRebalanceCheckpointTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(50L * 1024 *
1024).setPersistenceEnabled(true)
+ );
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ cfg.getDataStorageConfiguration().setFileIOFactory(
+ new
BlockableFileIOFactory(cfg.getDataStorageConfiguration().getFileIOFactory()));
+
+ cfg.getDataStorageConfiguration().setWalMode(WALMode.FSYNC); // Allows
to use special IO at WAL as well.
+
+ cfg.setFailureHandler(new StopNodeFailureHandler()); // Helps to kill
nodes on stop with disabled IO.
+
+ cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+ return cfg;
+ }
+
+ /**
+ * Tests delayed prepare/finish transaction requests to the backups with 2
backups and 2-phase commit.
+ */
+ @Test
+ public void testDelayedToBackupsRequests2Backups() throws Exception {
+ doTestDelayedToBackupsRequests(3, false);
+ }
+
+ /**
+ * Tests delayed prepare/finish transaction requests to the backups with 2
backups and 2-phase commit. Does more
+ * puts after the gaps.
+ */
+ @Test
+ public void testDelayedToBackupsRequests2BackupsMorePuts() throws
Exception {
+ doTestDelayedToBackupsRequests(3, true);
+ }
+
+ /**
+ * Tests delayed prepare/finish transaction requests to the backups with 1
backup and one-phase commit.
+ */
+ @Test
+ public void testDelayedToBackupsRequests1Backup() throws Exception {
+ doTestDelayedToBackupsRequests(2, false);
+ }
+
+ /**
+ * Tests delayed prepare/finish transaction requests to the backups with 1
backup and one-phase commit. Does more
+ * puts after the gaps.
+ */
+ @Test
+ public void testDelayedToBackupsRequests1BackupMorePuts() throws Exception
{
+ doTestDelayedToBackupsRequests(2, true);
+ }
+
+ /**
+ * Test one-phase commit with lost responses from the backup.
+ */
+ @Test
+ public void testDelayed1PhaseCommitResponses() throws Exception {
+ final int preloadCnt = 2_000;
+
+ prepareCluster(2, preloadCnt);
+
+ Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+
+ Ignite backup = backupNodes(0L, DEFAULT_CACHE_NAME).get(0);
+
+ AtomicBoolean prepareBlock = new AtomicBoolean();
+
+ AtomicReference<CountDownLatch> blockLatch = new AtomicReference<>();
+
+ TestRecordingCommunicationSpi.spi(backup).blockMessages(new
IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxPrepareResponse &&
prepareBlock.get()) {
+ CountDownLatch latch = blockLatch.get();
+
+ latch.countDown();
+
+ return true;
+ }
+ else
+ return false;
+ }
+ });
+
+ IgniteCache<Integer, Integer> primCache =
prim.cache(DEFAULT_CACHE_NAME);
+
+ Consumer<Integer> cachePutAsync = (key) -> GridTestUtils.runAsync(()
-> primCache.put(key, key));
+
+ prepareBlock.set(true);
+
+ blockLatch.set(new CountDownLatch(20));
+
+ int updateCnt = preloadCnt;
+
+ for (int i = 0; i < 20; i++)
+ cachePutAsync.accept(++updateCnt);
+
+ blockLatch.get().await();
+
+ // Storing the highest counters on backup.
+ forceCheckpoint();
+
+ IdleVerifyResultV2 checkRes = idleVerify(prim, DEFAULT_CACHE_NAME);
+
+ Map<Boolean, PartitionHashRecordV2> conflicts =
F.flatCollections(checkRes.counterConflicts().values())
+
.stream().collect(Collectors.toMap(PartitionHashRecordV2::isPrimary,
Functions.identity()));
+
+ // The cache is of only 1 partition with 2 nodes: primary and backup.
+ assertEquals(2, conflicts.size());
+
+ // Ensure the backup node got a higher counter.
+ assertCounters(conflicts.get(true).updateCounter(), preloadCnt, null,
preloadCnt);
+ assertCounters(conflicts.get(false).updateCounter(), updateCnt, null,
updateCnt);
+
+ String backName = backup.name();
+
+ backup.close();
+
+ TestRecordingCommunicationSpi.spi(prim).blockMessages((n, m) -> m
instanceof GridDhtPartitionDemandMessage ||
+ m instanceof GridDhtPartitionSupplyMessage
+ );
+
+ startGrid(backName);
+
+ awaitPartitionMapExchange();
+
+ // Primary commits transactions when the backup node leaves. Ensure no
rebalance occurs.
+ assertFalse(TestRecordingCommunicationSpi.spi(prim).waitForBlocked(1,
5_000));
+
+ checkRes = idleVerify(prim, DEFAULT_CACHE_NAME);
+ assertFalse(checkRes.hasConflicts());
+ }
+
+ /** */
+ private int prepareCluster(int nodes, int loadCnt) throws Exception {
+ assert nodes > 1;
+
+ int backupNodes = nodes - 1;
+
+ IgniteEx ignite = startGrids(nodes);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new
CacheConfiguration<>()
+ .setAffinity(new RendezvousAffinityFunction(false, 1))
+ .setBackups(backupNodes)
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setWriteSynchronizationMode(FULL_SYNC) // Allows to be sure that
all messages are sent when put succeed.
+ .setReadFromBackup(true)); // Allows checking values on backups.
+
+ // Initial preloading enough to have historical rebalance.
+ for (int i = 0; i < loadCnt; i++) //
+ cache.put(i, i);
+
+ // To have historical rebalance on cluster recovery. Decreases percent
of updates in comparison to cache size.
+ stopAllGrids();
+ startGrids(nodes);
+
+ return backupNodes;
+ }
+
+ /**
+ * Tests delayed prepare/finish transaction requests to the backups.
+ *
+ * @param nodes Nodes number. The backups number is {@code nodes} - 1.
+ * @param putAfterGaps If {@code true}, does more puts to the cache after
the simulated gaps.
+ */
+ private void doTestDelayedToBackupsRequests(int nodes, boolean
putAfterGaps) throws Exception {
+ final int preloadCnt = 2_000;
+ final int prepareBlockCnt = 20;
+ final int finishBlockCnt = 30;
+ final int putsAfterGapsCnt = 50;
+
+ int backupNodes = prepareCluster(nodes, preloadCnt);
+
+ Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+
+ List<Ignite> backups = backupNodes(0L, DEFAULT_CACHE_NAME);
+
+ AtomicBoolean prepareBlock = new AtomicBoolean();
+ AtomicBoolean finishBlock = new AtomicBoolean();
+
+ AtomicReference<CountDownLatch> blockLatch = new AtomicReference<>();
+
+ TestRecordingCommunicationSpi.spi(prim).blockMessages(new
IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if ((msg instanceof GridDhtTxPrepareRequest &&
prepareBlock.get()) ||
+ (msg instanceof GridDhtTxFinishRequest &&
finishBlock.get())) {
+ CountDownLatch latch = blockLatch.get();
+
+ assertTrue(latch.getCount() > 0);
+
+ latch.countDown();
+
+ return true;
+ }
+ else
+ return false;
+ }
+ });
+
+ IgniteCache<Integer, Integer> primCache =
prim.cache(DEFAULT_CACHE_NAME);
+
+ Consumer<Integer> cachePutAsync = (key) -> GridTestUtils.runAsync(()
-> primCache.put(key, key));
+
+ int updateCnt = preloadCnt;
+
+ try {
+ // Blocked at primary and backups.
+ prepareBlock.set(true);
+
+ blockLatch.set(new CountDownLatch(backupNodes * prepareBlockCnt));
+
+ for (int i = 0; i < prepareBlockCnt; i++)
+ cachePutAsync.accept(++updateCnt);
+
+ blockLatch.get().await();
+ }
+ finally {
+ prepareBlock.set(false);
+ }
+
+ if (backupNodes > 1) {
+ try {
+ // Blocked at backups only.
+ finishBlock.set(true);
+
+ blockLatch.set(new CountDownLatch(backupNodes *
finishBlockCnt));
+
+ for (int i = 0; i < finishBlockCnt; i++)
+ cachePutAsync.accept(++updateCnt);
+
+ blockLatch.get().await();
+ }
+ finally {
+ finishBlock.set(false);
+ }
+ }
+
+ if (putAfterGaps) {
+ for (int i = 0; i < putsAfterGapsCnt; i++)
+ prim.cache(DEFAULT_CACHE_NAME).put(++updateCnt, updateCnt);
+ }
+
+ // Storing counters on primary.
+ forceCheckpoint();
+
+ Collection<PartitionHashRecordV2> conflicts =
+ F.flatCollections(idleVerify(prim,
DEFAULT_CACHE_NAME).counterConflicts().values());
+
+ // With 1 backup and one-phase commit partitions are the same because
there are no finish requests while all
+ // prepare requests were blocked.
+ assertTrue(!conflicts.isEmpty() || backupNodes == 1);
+
+ // Ensure the primary node got a higher counter.
+ for (PartitionHashRecordV2 c : conflicts) {
+ if (c.isPrimary()) {
+ assertCounters(c.updateCounter(), preloadCnt, "" + (preloadCnt
+ 1) + " - " +
+ (preloadCnt + prepareBlockCnt), updateCnt);
+ }
+ else {
+ assertCounters(c.updateCounter(), preloadCnt,
+ putAfterGaps ? "" + (preloadCnt + 1) + " - " + (preloadCnt
+ putsAfterGapsCnt) : null,
+ putAfterGaps ? updateCnt : preloadCnt);
+ }
+ }
+
+ // Emulating power off, OOM or disk overflow. Keeping data as is, with
missed counters updates.
+ backups.forEach(node ->
((BlockableFileIOFactory)node.configuration().getDataStorageConfiguration()
+ .getFileIOFactory()).blocked = true);
+
+ List<String> backNames =
backups.stream().map(Ignite::name).collect(Collectors.toList());
+
+ CountDownLatch rebalanceFinished = new CountDownLatch(1);
+
+ backups.forEach(Ignite::close);
+
+
TestRecordingCommunicationSpi.spi(prim).blockMessages(GridDhtPartitionSupplyMessage.class,
backNames.get(0));
+
+ ListeningTestLogger testLog = new ListeningTestLogger(prim.log());
+
+ // Ensures the rebalance was historical.
+ LogListener rebalanceLsnr = LogListener.matches("fullPartitions=[], " +
+ "histPartitions=[0]").times(backupNodes).build();
+
+ testLog.registerListener(rebalanceLsnr);
+
+ // Restore just any backup.
+ IgniteEx backup = startGrid(backNames.get(0));
+
+ TestRecordingCommunicationSpi.spi(prim).waitForBlocked();
+
+ backup.events().localListen(evt -> {
+ rebalanceFinished.countDown();
+
+ return true;
+ }, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+ TestRecordingCommunicationSpi.spi(prim).stopBlock();
+
+ rebalanceFinished.await();
+
+ rebalanceLsnr.check();
+
+ IdleVerifyResultV2 checkRes = idleVerify(prim, DEFAULT_CACHE_NAME);
+ assertFalse(checkRes.hasConflicts());
+ }
+
+ /** */
+ private static void assertCounters(Object cntr, int lwm, String missed,
int hwm) {
+ assertEquals(cntr, "[lwm=" + lwm + ", missed=[" + (F.isEmpty(missed) ?
"" : missed) + "], hwm=" + hwm +
+ "]");
+ }
+
+ /**
+ * Simulates IO falure on data writing.
+ */
+ private static class BlockableFileIOFactory implements FileIOFactory {
+ /** IO Factory. */
+ private final FileIOFactory factory;
+
+ /** Blocked. */
+ public volatile boolean blocked;
+
+ /**
+ * @param factory Factory.
+ */
+ public BlockableFileIOFactory(FileIOFactory factory) {
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws
IOException {
+ return new FileIODecorator(factory.create(file, modes)) {
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer srcBuf) throws
IOException {
+ if (blocked)
+ throw new IOException("Simulated IO failure.");
+
+ return super.write(srcBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer srcBuf, long position)
throws IOException {
+ if (blocked)
+ throw new IOException();
+
+ return super.write(srcBuf, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(byte[] buf, int off, int len)
throws IOException {
+ if (blocked)
+ throw new IOException();
+
+ return super.write(buf, off, len);
+ }
+ };
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 0c5f89371cc..a73b7584c1d 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cdc.CdcCacheVersionTest;
import org.apache.ignite.cdc.CdcSelfTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
import org.apache.ignite.cdc.WalForCdcTest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceCheckpointTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
@@ -198,6 +199,8 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite,
HistoricalRebalanceTwoPartsInDifferentCheckpointsTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
HistoricalRebalanceCheckpointTest.class, ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite, GridFileUtilsTest.class,
ignoredTests);
}
}