This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 57c07ee68d IGNITE-22465 Recover zones' replicas and pending/stable
events on a node restart (#4043)
57c07ee68d is described below
commit 57c07ee68d5d838a25f1a1d6dbefb244dfbf9b0d
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Jul 9 17:07:27 2024 +0400
IGNITE-22465 Recover zones' replicas and pending/stable events on a node
restart (#4043)
---
.../DistributionZoneRebalanceEngineV2.java | 46 ++-
.../rebalance/ZoneRebalanceUtil.java | 32 +-
modules/partition-replicator/build.gradle | 1 +
.../replicator/ItReplicaLifecycleTest.java | 345 ++++++++++++++++-----
.../PartitionReplicaLifecycleManager.java | 142 ++++++++-
.../ignite/internal/replicator/ReplicaManager.java | 6 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../internal/table/distributed/TableManager.java | 2 +-
8 files changed, 469 insertions(+), 106 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index f409e9b316..1f6b1f53d3 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
@@ -25,9 +27,11 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.triggerZonePartitionsRebalance;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -108,7 +112,14 @@ public class DistributionZoneRebalanceEngineV2 {
// TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
metaStorageManager.registerPrefixWatch(zoneDataNodesKey(),
dataNodesListener);
- return nullCompletedFuture();
+ CompletableFuture<Long> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
+
+ // At the moment of the start of this manager, it is guaranteed
that Meta Storage has been recovered.
+ assert recoveryFinishFuture.isDone();
+
+ long recoveryRevision = recoveryFinishFuture.join();
+
+ return rebalanceTriggersRecovery(recoveryRevision);
});
}
@@ -161,7 +172,8 @@ public class DistributionZoneRebalanceEngineV2 {
zoneDescriptor,
filteredDataNodes,
evt.entryEvent().newEntry().revision(),
- metaStorageManager
+ metaStorageManager,
+ busyLock
);
});
}
@@ -206,8 +218,36 @@ public class DistributionZoneRebalanceEngineV2 {
zoneDescriptor,
dataNodes,
causalityToken,
- metaStorageManager
+ metaStorageManager,
+ busyLock
);
}));
}
+
+ /**
+ * Run the update of rebalance metastore's state.
+ *
+ * @param recoveryRevision Recovery revision.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment
this method produce many metastore multi-invokes
+ // TODO: which can be avoided by the local logic, which mirror the logic
of metastore invokes.
+ // TODO: And then run the remote invoke, only if needed.
+ private CompletableFuture<Void> rebalanceTriggersRecovery(long
recoveryRevision) {
+ if (recoveryRevision > 0) {
+ List<CompletableFuture<Void>> zonesRecoveryFutures =
catalogService.zones(catalogService.latestCatalogVersion())
+ .stream()
+ .map(zoneDesc ->
+
recalculateAssignmentsAndTriggerZonePartitionsRebalance(
+ zoneDesc,
+ recoveryRevision,
+ catalogService.latestCatalogVersion()
+ )
+ )
+ .collect(Collectors.toUnmodifiableList());
+
+ return allOf(zonesRecoveryFutures.toArray(new
CompletableFuture[0]));
+ } else {
+ return completedFuture(null);
+ }
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 23fea49067..61087071bb 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -38,6 +38,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
/**
@@ -273,7 +275,8 @@ public class ZoneRebalanceUtil {
CatalogZoneDescriptor zoneDescriptor,
Set<String> dataNodes,
long storageRevision,
- MetaStorageManager metaStorageManager
+ MetaStorageManager metaStorageManager,
+ IgniteSpinBusyLock busyLock
) {
CompletableFuture<Map<Integer, Assignments>> zoneAssignmentsFut =
zoneAssignments(
metaStorageManager,
@@ -289,19 +292,20 @@ public class ZoneRebalanceUtil {
int finalPartId = partId;
- partitionFutures[partId] =
zoneAssignmentsFut.thenCompose(zoneAssignments ->
- // TODO https://issues.apache.org/jira/browse/IGNITE-19763
We should distinguish empty stable assignments on
- // TODO node recovery in case of interrupted table
creation, and moving from empty assignments to non-empty.
- zoneAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
- zoneDescriptor,
- replicaGrpId,
- dataNodes,
- zoneDescriptor.replicas(),
- storageRevision,
- metaStorageManager,
- finalPartId,
- zoneAssignments.get(finalPartId).nodes()
- ));
+ partitionFutures[partId] =
zoneAssignmentsFut.thenCompose(zoneAssignments -> inBusyLockAsync(busyLock, ()
-> {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19763 We
should distinguish empty stable assignments on
+ // TODO node recovery in case of interrupted table creation,
and moving from empty assignments to non-empty.
+ return zoneAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
+ zoneDescriptor,
+ replicaGrpId,
+ dataNodes,
+ zoneDescriptor.replicas(),
+ storageRevision,
+ metaStorageManager,
+ finalPartId,
+ zoneAssignments.get(finalPartId).nodes()
+ );
+ }));
}
// This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
diff --git a/modules/partition-replicator/build.gradle
b/modules/partition-replicator/build.gradle
index 1911df7768..b6b52014b8 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -39,6 +39,7 @@ dependencies {
implementation project(':ignite-schema')
implementation project(':ignite-transactions')
implementation project(':ignite-storage-api')
+ implementation project(':ignite-low-watermark')
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-core'))
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index f0a435f281..77a90e9aff 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZo
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.FEATURE_FLAG_NAME;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
@@ -35,6 +36,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.apache.ignite.sql.ColumnType.INT64;
@@ -44,18 +46,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
@@ -103,6 +110,8 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -219,10 +228,21 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
@InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE +
".engine = \"aipersist\", test.engine=\"test\"}")
private static StorageConfiguration storageConfiguration;
+ private final List<NetworkAddress> nodeAddresses = new ArrayList<>();
+
+ private final List<NodeAttributesConfiguration>
nodeAttributesConfigurations = new ArrayList<>(
+ List.of(nodeAttributes1, nodeAttributes2, nodeAttributes3)
+ );
+
+ /**
+ * Interceptor of {@link MetaStorageManager#invoke(Condition, Collection,
Collection)}.
+ */
+ private final Map<Integer, InvokeInterceptor>
metaStorageInvokeInterceptorByNode = new ConcurrentHashMap<>();
+
@WorkDirectory
private Path workDir;
- private List<Node> nodes;
+ private Map<Integer, Node> nodes;
private TestPlacementDriver placementDriver;
@@ -243,28 +263,29 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@BeforeEach
- void before(TestInfo testInfo) throws Exception {
- nodes = new ArrayList<>();
+ void before(TestInfo testInfo) {
+ nodes = new HashMap<>();
placementDriver = new TestPlacementDriver();
- List<NetworkAddress> nodeAddresses = new ArrayList<>();
-
- List<NodeAttributesConfiguration> nodeAttributesConfigurations = new
ArrayList<>();
- nodeAttributesConfigurations.addAll(List.of(nodeAttributes1,
nodeAttributes2, nodeAttributes3));
-
for (int i = 0; i < NODE_COUNT; i++) {
nodeAddresses.add(new NetworkAddress(HOST, BASE_PORT + i));
}
finder = new StaticNodeFinder(nodeAddresses);
+ }
- int i = 0;
+ @AfterEach
+ void after() {
+ metaStorageInvokeInterceptorByNode.clear();
+ nodes.values().forEach(Node::stop);
+ }
- for (NetworkAddress addr : nodeAddresses) {
- var node = new Node(testInfo, addr,
nodeAttributesConfigurations.get(i++));
+ private void startNodes(TestInfo testInfo, int amount) throws
NodeStoppingException, InterruptedException {
+ for (int i = 0; i < amount; i++) {
+ var node = new Node(testInfo, i);
- nodes.add(node);
+ nodes.put(i, node);
node.start();
}
@@ -273,10 +294,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name),
"cluster");
- nodes.forEach(Node::waitWatches);
+ nodes.values().forEach(Node::waitWatches);
assertThat(
- allOf(nodes.stream().map(n ->
n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)),
+ allOf(nodes.values().stream().map(n ->
n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)),
willCompleteSuccessfully()
);
@@ -286,21 +307,40 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
assertThat(logicalTopologyFuture,
willCompleteSuccessfully());
- return logicalTopologyFuture.join().nodes().size() ==
NODE_COUNT;
+ return logicalTopologyFuture.join().nodes().size() ==
amount;
},
AWAIT_TIMEOUT_MILLIS
));
}
- @AfterEach
- void after() {
- nodes.forEach(Node::stop);
+ private Node startNode(TestInfo testInfo, int idx) {
+ var node = new Node(testInfo, idx);
+
+ nodes.put(idx, node);
+
+ node.start();
+
+ node.waitWatches();
+
+ assertThat(node.cmgManager.onJoinReady(), willCompleteSuccessfully());
+
+ return node;
+ }
+
+ private void stopNode(int idx) {
+ Node node = getNode(idx);
+
+ node.stop();
+
+ nodes.remove(idx);
}
@Test
- public void testEmptyReplicaListener() throws NodeStoppingException {
+ public void testEmptyReplicaListener(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 3);
+
Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
- nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
Node node = getNode(replicaAssignment.consistentId());
@@ -323,13 +363,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
- void testAlterReplicaTrigger() throws Exception {
- Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
- nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
-
- Node node = getNode(replicaAssignment.consistentId());
+ void testAlterReplicaTrigger(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 3);
-
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+ Node node = getNode(0);
createZone(node, "test_zone", 1, 3);
@@ -344,7 +381,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
- nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
20_000L
);
@@ -360,24 +397,13 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
2,
20_000L
);
-
- assertValueInStorage(
- metaStorageManager,
- stablePartAssignmentsKey(partId),
- (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
- true,
- 20_000L
- );
}
@Test
- void testAlterReplicaTriggerDefaultZone() throws Exception {
- Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
- nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+ void testAlterReplicaTriggerDefaultZone(TestInfo testInfo) throws
Exception {
+ startNodes(testInfo, 3);
- Node node = getNode(replicaAssignment.consistentId());
-
-
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+ Node node = getNode(0);
CatalogManager catalogManager = node.catalogManager;
@@ -408,22 +434,13 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
2,
20_000L
);
-
- assertValueInStorage(
- metaStorageManager,
- stablePartAssignmentsKey(partId),
- (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
- true,
- 20_000L
- );
}
@Test
- void testAlterReplicaExtensionTrigger() throws Exception {
- Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
- nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+ void testAlterReplicaExtensionTrigger(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 3);
- Node node = getNode(replicaAssignment.consistentId());
+ Node node = getNode(0);
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
@@ -446,14 +463,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
20_000L
);
- assertValueInStorage(
- metaStorageManager,
- stablePartAssignmentsKey(partId),
- (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
- true,
- 20_000L
- );
-
CatalogManager catalogManager = node.catalogManager;
alterZone(catalogManager, "test_zone", 3);
@@ -463,17 +472,16 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
- nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
20_000L
);
}
@Test
- void testAlterFilterTrigger() throws Exception {
- Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
- nodes.stream().map(n -> n.name).collect(Collectors.toList()),
0, 1).toArray()[0];
+ void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 3);
- Node node = getNode(replicaAssignment.consistentId());
+ Node node = getNode(0);
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
@@ -490,7 +498,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
- nodes.stream().map(n -> n.name).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
20_000L
);
@@ -508,14 +516,168 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
Set.of(nodes.get(0).name),
20_000L
);
+ }
+
+ @Test
+ void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 3);
+
+ Node node0 = getNode(0);
+
+ createZone(node0, "test_zone", 2, 3);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager,
"test_zone", node0.hybridClock.nowLong());
+
+ MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
+ 20_000L
+ );
+
+ stopNode(2);
+
+ Node node2 = startNode(testInfo, 2);
+
+ assertTrue(waitForCondition(() ->
node2.replicaManager.isReplicaStarted(partId), 10_000L));
+ }
+
+ @Test
+ void testStableAreWrittenAfterRestart(TestInfo testInfo) throws Exception {
+ startNodes(testInfo, 1);
+
+ Node node0 = getNode(0);
+
+ AtomicBoolean reached = new AtomicBoolean();
+
+ metaStorageInvokeInterceptorByNode.put(0, (condition, success,
failure) -> {
+ if (skipMetaStorageInvoke(success, STABLE_ASSIGNMENTS_PREFIX)) {
+ reached.set(true);
+
+ return true;
+ }
+
+ return null;
+ });
+
+ createZone(node0, "test_zone", 2, 3);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager,
"test_zone", node0.hybridClock.nowLong());
+
+ MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ assertTrue(reached.get());
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ Assignments::fromBytes,
+ null,
+ 20_000L
+ );
+
+ stopNode(0);
+
+ metaStorageInvokeInterceptorByNode.clear();
+
+ startNodes(testInfo, 1);
+
+ node0 = getNode(0);
+
+ metaStorageManager = node0.metaStorageManager;
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
+ 20_000L
+ );
+
+ assertTrue(waitForCondition(() ->
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
+ }
+
+ @Test
+ void testStableAreWrittenAfterRestartAndConcurrentStableUpdate(TestInfo
testInfo) throws Exception {
+ startNodes(testInfo, 1);
+
+ Node node0 = getNode(0);
+
+ AtomicBoolean reached = new AtomicBoolean();
+
+ metaStorageInvokeInterceptorByNode.put(0, (condition, success,
failure) -> {
+ if (skipMetaStorageInvoke(success, STABLE_ASSIGNMENTS_PREFIX)) {
+ reached.set(true);
+
+ return true;
+ }
+
+ return null;
+ });
+
+ createZone(node0, "test_zone", 1, 3);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager,
"test_zone", node0.hybridClock.nowLong());
+
+ MetaStorageManager metaStorageManager = node0.metaStorageManager;
+
+ ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
+
+ assertTrue(reached.get());
+
+ reached.set(false);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ Assignments::fromBytes,
+ null,
+ 20_000L
+ );
+
+ stopNode(0);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ metaStorageInvokeInterceptorByNode.put(0, (condition, success,
failure) -> {
+ if (skipMetaStorageInvoke(success,
stablePartAssignmentsKey(partId).toString())) {
+ reached.set(true);
+
+ Node node = nodes.get(0);
+
+ node.metaStorageManager.put(stablePartAssignmentsKey(partId),
Assignments.of(Assignment.forPeer(node.name)).toBytes());
+ }
+
+ return null;
+ });
+
+ startNodes(testInfo, 1);
+
+ node0 = getNode(0);
+
+ metaStorageManager = node0.metaStorageManager;
+
+ assertTrue(reached.get());
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
- (v) ->
Assignments.fromBytes(v).nodes().contains(replicaAssignment),
- true,
+ (v) -> Assignments.fromBytes(v).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toSet()),
20_000L
);
+
+ assertTrue(waitForCondition(() ->
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
}
private Node getNode(int nodeIndex) {
@@ -523,7 +685,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
private Node getNode(String nodeName) {
- return nodes.stream().filter(n ->
n.name.equals(nodeName)).findFirst().get();
+ return nodes.values().stream().filter(n ->
n.name.equals(nodeName)).findFirst().get();
}
private static void createZone(Node node, String zoneName, int partitions,
int replicas) {
@@ -635,10 +797,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
/**
* Constructor that simply creates a subset of components of this node.
*/
- Node(TestInfo testInfo, NetworkAddress addr,
NodeAttributesConfiguration nodeAttributes) {
- networkAddress = addr;
+ Node(TestInfo testInfo, int idx) {
+ networkAddress = nodeAddresses.get(idx);
- name = testNodeName(testInfo, addr.port());
+ name = testNodeName(testInfo, networkAddress.port());
Path dir = workDir.resolve(name);
@@ -672,7 +834,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
- addr.port(),
+ networkAddress.port(),
finder
);
@@ -711,7 +873,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
+ new
NodeAttributesCollector(nodeAttributesConfigurations.get(idx),
storageConfiguration),
failureProcessor
);
@@ -738,7 +900,26 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
metaStorageConfiguration,
raftConfiguration.retryTimeout(),
completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
- );
+ ) {
+ @Override
+ public CompletableFuture<Boolean> invoke(
+ Condition condition,
+ Collection<Operation> success,
+ Collection<Operation> failure
+ ) {
+ InvokeInterceptor invokeInterceptor =
metaStorageInvokeInterceptorByNode.get(idx);
+
+ if (invokeInterceptor != null) {
+ var res = invokeInterceptor.invoke(condition, success,
failure);
+
+ if (res != null) {
+ return completedFuture(res);
+ }
+ }
+
+ return super.invoke(condition, success, failure);
+ }
+ };
threadPoolsManager = new ThreadPoolsManager(name);
@@ -818,7 +999,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
replicaSvc,
lockManager,
clockService,
- new TransactionIdGenerator(addr.port()),
+ new TransactionIdGenerator(networkAddress.port()),
placementDriver,
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
@@ -878,6 +1059,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
distributionZoneManager,
metaStorageManager,
clusterService.topologyService(),
+ lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler
);
@@ -1026,4 +1208,13 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
clusterCfgGenerator.close();
}
}
+
+ @FunctionalInterface
+ private interface InvokeInterceptor {
+ Boolean invoke(Condition condition, Collection<Operation> success,
Collection<Operation> failure);
+ }
+
+ private static boolean skipMetaStorageInvoke(Collection<Operation> ops,
String prefix) {
+ return ops.stream().anyMatch(op -> new String(toByteArray(op.key()),
StandardCharsets.UTF_8).startsWith(prefix));
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index fa5fc9679b..1b7e7db986 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -38,6 +38,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -50,6 +51,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -63,6 +65,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -76,12 +79,14 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
@@ -101,6 +106,7 @@ import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -129,6 +135,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
private final TopologyService topologyService;
+ private final LowWatermark lowWatermark;
+
/** Meta storage listener for pending assignments. */
private final WatchListener pendingAssignmentsRebalanceListener;
@@ -170,6 +178,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
DistributionZoneManager distributionZoneMgr,
MetaStorageManager metaStorageMgr,
TopologyService topologyService,
+ LowWatermark lowWatermark,
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler
) {
@@ -178,6 +187,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
this.distributionZoneMgr = distributionZoneMgr;
this.metaStorageMgr = metaStorageMgr;
this.topologyService = topologyService;
+ this.lowWatermark = lowWatermark;
this.ioExecutor = ioExecutor;
this.rebalanceScheduler = rebalanceScheduler;
@@ -192,6 +202,17 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return nullCompletedFuture();
}
+ CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
+
+ assert recoveryFinishFuture.isDone();
+
+ long recoveryRevision = recoveryFinishFuture.join();
+
+ cleanUpResourcesForDroppedZonesOnRecovery();
+
+ CompletableFuture<Void> processZonesAndAssignmentsOnStart =
processZonesOnStart(recoveryRevision, lowWatermark.getLowWatermark())
+ .thenCompose(ignored ->
processAssignmentsOnRecovery(recoveryRevision));
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
@@ -203,22 +224,121 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
inBusyLock(busyLock, () ->
onCreateZone(parameters).thenApply((ignored) -> false))
);
- return nullCompletedFuture();
+ return processZonesAndAssignmentsOnStart;
+ }
+
+ private CompletableFuture<Void> processZonesOnStart(long recoveryRevision,
@Nullable HybridTimestamp lwm) {
+ int earliestCatalogVersion =
catalogMgr.activeCatalogVersion(hybridTimestampToLong(lwm));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22679
+ int latestCatalogVersion = catalogMgr.latestCatalogVersion();
+
+ var startedZones = new IntOpenHashSet();
+ var startZoneFutures = new ArrayList<CompletableFuture<?>>();
+
+ for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion;
ver--) {
+ int ver0 = ver;
+ catalogMgr.zones(ver).stream()
+ .filter(zone -> startedZones.add(zone.id()))
+ .forEach(zoneDescriptor -> startZoneFutures.add(
+
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0,
zoneDescriptor)));
+ }
+
+ return allOf(startZoneFutures.toArray(CompletableFuture[]::new))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Error starting zones", throwable);
+ } else {
+ LOG.debug(
+ "Zones started successfully
[earliestCatalogVersion={}, latestCatalogVersion={}, startedZoneIds={}]",
+ earliestCatalogVersion,
+ latestCatalogVersion,
+ startedZones
+ );
+ }
+ });
+ }
+
+ private CompletableFuture<Void> processAssignmentsOnRecovery(long
recoveryRevision) {
+ var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+ var pendingAssignmentsPrefix = new
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+
+ // It's required to handle stable assignments changes on recovery in
order to cleanup obsolete resources.
+ CompletableFuture<Void> stableFuture = handleAssignmentsOnRecovery(
+ stableAssignmentsPrefix,
+ recoveryRevision,
+ (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev,
true),
+ "stable"
+ );
+
+ CompletableFuture<Void> pendingFuture = handleAssignmentsOnRecovery(
+ pendingAssignmentsPrefix,
+ recoveryRevision,
+ (entry, rev) -> handleChangePendingAssignmentEvent(entry, rev,
true),
+ "pending"
+ );
+
+ return allOf(stableFuture, pendingFuture);
+ }
+
+ private CompletableFuture<Void> handleAssignmentsOnRecovery(
+ ByteArray prefix,
+ long revision,
+ BiFunction<Entry, Long, CompletableFuture<Void>>
assignmentsEventHandler,
+ String assignmentsType
+ ) {
+ try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix,
revision)) {
+ CompletableFuture<?>[] futures = cursor.stream()
+ .map(entry -> {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Non handled {} assignments for key '{}'
discovered, performing recovery",
+ assignmentsType,
+ new String(entry.key(), UTF_8)
+ );
+ }
+
+ return assignmentsEventHandler.apply(entry, revision);
+ })
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures)
+ .whenComplete((res, e) -> {
+ if (e != null) {
+ LOG.error("Error when performing assignments
recovery", e);
+ }
+ });
+ }
+ }
+
+ private void cleanUpResourcesForDroppedZonesOnRecovery() {
+ // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from vault and metastore
}
private CompletableFuture<Void> onCreateZone(CreateZoneEventParameters
createZoneEventParameters) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-22535 start
replica must be moved from metastore thread
+ return calculateZoneAssignmentsAndCreateReplicationNodes(
+ createZoneEventParameters.causalityToken(),
+ createZoneEventParameters.catalogVersion(),
+ createZoneEventParameters.zoneDescriptor()
+ );
+ }
+
+ private CompletableFuture<Void>
calculateZoneAssignmentsAndCreateReplicationNodes(
+ long causalityToken,
+ int catalogVersion,
+ CatalogZoneDescriptor zoneDescriptor
+ ) {
return inBusyLockAsync(busyLock, () -> {
CompletableFuture<List<Assignments>> assignmentsFuture =
getOrCreateAssignments(
- createZoneEventParameters.zoneDescriptor(),
- createZoneEventParameters.causalityToken(),
- createZoneEventParameters.catalogVersion()
+ zoneDescriptor,
+ causalityToken,
+ catalogVersion
);
CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
-
writeZoneAssignmentsToMetastore(createZoneEventParameters.zoneDescriptor().id(),
assignmentsFuture);
+ writeZoneAssignmentsToMetastore(zoneDescriptor.id(),
assignmentsFuture);
- return createZoneReplicationNodes(assignmentsFutureAfterInvoke,
createZoneEventParameters.zoneDescriptor().id());
+ return createZoneReplicationNodes(assignmentsFutureAfterInvoke,
zoneDescriptor.id());
});
}
@@ -260,6 +380,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return nullCompletedFuture();
}
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 We need to
integrate PartitionReplicatorNodeRecovery logic here when
+ // we in the recovery phase.
+
PeersAndLearners stablePeersAndLearners =
fromAssignments(stableAssignments.nodes());
ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, partId);
@@ -285,7 +408,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
new FailFastSnapshotStorageFactory(),
stablePeersAndLearners,
raftGroupListener,
- raftGroupEventsListener
+ raftGroupEventsListener,
+ busyLock
).thenApply(ignored -> null);
} catch (NodeStoppingException e) {
return failedFuture(e);
@@ -772,7 +896,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
localServicesStartFuture = nullCompletedFuture();
}
- return localServicesStartFuture.thenRunAsync(() -> {
+ return localServicesStartFuture.thenRunAsync(() ->
inBusyLock(busyLock, () -> {
if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
return;
}
@@ -784,7 +908,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
: RebalanceUtil.union(pendingAssignmentsNodes,
stableAssignments.nodes());
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
- }, ioExecutor);
+ }), ioExecutor);
}
private CompletableFuture<Void> changePeersOnRebalance(
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index fd3d919f64..bdf06f50bd 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -608,7 +608,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
updateTableRaftService,
createListener,
storageIndexTracker,
- newRaftClientFut);
+ newRaftClientFut
+ );
}
/**
@@ -678,7 +679,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
SnapshotStorageFactory snapshotStorageFactory,
PeersAndLearners newConfiguration,
RaftGroupListener raftGroupListener,
- RaftGroupEventsListener raftGroupEventsListener
+ RaftGroupEventsListener raftGroupEventsListener,
+ IgniteSpinBusyLock busyLock
) throws NodeStoppingException {
RaftGroupOptions groupOptions = groupOptionsForPartition(
false,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bbdaa853d0..736cd84bd2 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -748,6 +748,7 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
metaStorageMgr,
clusterSvc.topologyService(),
+ lowWatermark,
threadPoolsManager.tableIoExecutor(),
rebalanceScheduler
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ff6c7ce7f0..da8639b6a8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -671,7 +671,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
.map(entry -> {
if (LOG.isInfoEnabled()) {
LOG.info(
- "Missed {} assignments for key '{}'
discovered, performing recovery",
+ "Non handled {} assignments for key '{}'
discovered, performing recovery",
assignmentsType,
new String(entry.key(), UTF_8)
);