This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-3.0
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0 by this push:
new 59c0160a8a IGNITE-24138 Fix highestRevision check for HA reset case
(#5020)
59c0160a8a is described below
commit 59c0160a8aa7807ff59322e835f40380ef382558
Author: Kirill Gusakov <[email protected]>
AuthorDate: Mon Jan 13 20:58:16 2025 +0300
IGNITE-24138 Fix highestRevision check for HA reset case (#5020)
(cherry picked from commit 48b06e03db7eb2edb0001080ee74a211848dde38)
---
.../distributionzones/DistributionZoneManager.java | 30 ++++-
modules/table/build.gradle | 1 +
.../ItHighAvailablePartitionsRecoveryTest.java | 24 +---
...lablePartitionsRecoveryWithNodeRestartTest.java | 134 +++++++++++++++++++++
4 files changed, 163 insertions(+), 26 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 1e6203b7bf..c7ddb55828 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -75,6 +75,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -278,6 +279,8 @@ public class DistributionZoneManager extends
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return inBusyLockAsync(busyLock, () -> {
+ partitionDistributionResetTimeoutConfiguration.init();
+
registerCatalogEventListenersOnStartManagerBusy();
logicalTopologyService.addEventListener(topologyEventListener);
@@ -301,8 +304,6 @@ public class DistributionZoneManager extends
// fires CatalogManager's ZONE_CREATE event, and the state of
DistributionZoneManager becomes consistent.
int catalogVersion = catalogManager.latestCatalogVersion();
- partitionDistributionResetTimeoutConfiguration.init();
-
return allOf(
createOrRestoreZonesStates(recoveryRevision,
catalogVersion),
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision,
catalogVersion)
@@ -394,6 +395,18 @@ public class DistributionZoneManager extends
int partitionDistributionResetTimeoutSeconds,
long causalityToken
) {
+ CompletableFuture<Revisions> recoveryFuture =
metaStorageManager.recoveryFinishedFuture();
+
+ // At the moment of the first call to this method from configuration
notifications,
+ // it is guaranteed that Meta Storage has been recovered.
+ assert recoveryFuture.isDone();
+
+ if (recoveryFuture.join().revision() >= causalityToken) {
+ // So, configuration already has the right value on configuration
init
+ // and all timers started with the right configuration timeouts on
recovery.
+ return;
+ }
+
long updateTimestamp = timestampByRevision(causalityToken);
if (updateTimestamp == -1) {
@@ -413,7 +426,7 @@ public class DistributionZoneManager extends
ZoneState zoneState = zoneStateEntry.getValue();
if (partitionDistributionResetTimeoutSeconds !=
INFINITE_TIMER_VALUE) {
- Optional<Long> highestRevision =
zoneState.highestRevision(true);
+ Optional<Long> highestRevision = zoneState.highestRevision();
assert highestRevision.isEmpty() || causalityToken >=
highestRevision.get() : IgniteStringFormatter.format(
"Expected causalityToken that is greater or equal to
already seen meta storage events: highestRevision={}, "
@@ -1506,6 +1519,17 @@ public class DistributionZoneManager extends
.map(Map.Entry::getKey);
}
+ /**
+ * Returns the highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()}.
+ *
+ * @return The highest revision which is presented in the {@link
ZoneState#topologyAugmentationMap()}.
+ */
+ Optional<Long> highestRevision() {
+ return topologyAugmentationMap().keySet()
+ .stream()
+ .max(Comparator.naturalOrder());
+ }
+
@TestOnly
public synchronized ScheduledFuture<?> scaleUpTask() {
return scaleUpTask;
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index b0e1aa4571..28d1f9e6de 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -162,6 +162,7 @@ dependencies {
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.calcite.core
integrationTestImplementation libs.rocksdb.jni
+ integrationTestImplementation libs.awaitility
}
description = 'ignite-table'
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index 0616910e58..f667197268 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -46,7 +46,7 @@ import
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.junit.jupiter.api.Test;
/** Test for the HA zones recovery. */
-public class ItHighAvailablePartitionsRecoveryTest extends
AbstractHighAvailablePartitionsRecoveryTest {
+public class ItHighAvailablePartitionsRecoveryTest extends
AbstractHighAvailablePartitionsRecoveryTest {
@Override
protected int initialNodes() {
return 3;
@@ -160,28 +160,6 @@ public class ItHighAvailablePartitionsRecoveryTest
extends AbstractHighAvailabl
waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME,
PARTITION_IDS, Set.of(node.name()));
}
- @Test
- void testHaRecoveryOnZoneTimersRestoreAfterNodeRestart() throws
InterruptedException {
- createHaZoneWithTable();
-
- IgniteImpl node = igniteImpl(0);
-
- changePartitionDistributionTimeout(node, 10);
-
- assertRecoveryKeyIsEmpty(node);
-
- stopNodes(2, 1, 0);
-
- IgniteImpl node1 = unwrapIgniteImpl(startNode(0));
-
- waitAndAssertRecoveryKeyIsNotEmpty(node1, 30_000);
-
- assertRecoveryRequestForHaZoneTable(node1);
- assertRecoveryRequestWasOnlyOne(node1);
-
- waitAndAssertStableAssignmentsOfPartitionEqualTo(node1, HA_TABLE_NAME,
PARTITION_IDS, Set.of(node1.name()));
- }
-
@Test
void testScaleUpAfterReset() throws InterruptedException {
createHaZoneWithTable();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
new file mode 100644
index 0000000000..3a5ad53f82
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+/** Test for the HA zones recovery with node restarts. */
+class ItHighAvailablePartitionsRecoveryWithNodeRestartTest extends
AbstractHighAvailablePartitionsRecoveryTest {
+ /** How often we update the low water mark. */
+ private static final long LW_UPDATE_TIME_MS =
TimeUnit.MILLISECONDS.toMillis(500);
+
+ /** It should be less than {@link #LW_UPDATE_TIME_MS} for the test to
work. */
+ private static final long AIPERSIST_CHECKPOINT_INTERVAL_MS =
LW_UPDATE_TIME_MS / 2;
+
+ /** Should be greater than 2 x {@link #LW_UPDATE_TIME_MS} and long enough
to await for the catalog compaction finish. */
+ private static final long CATALOG_COMPACTION_AWAIT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(10);
+
+ private static final String
FAST_FAILURE_DETECTION_AND_FAST_CHECKPOINT_NODE_BOOTSTRAP_CFG_TEMPLATE =
"ignite {\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " },\n"
+ + " membership: {\n"
+ + " membershipSyncInterval: 1000,\n"
+ + " failurePingInterval: 500,\n"
+ + " scaleCube: {\n"
+ + " membershipSuspicionMultiplier: 1,\n"
+ + " failurePingRequestMembers: 1,\n"
+ + " gossipInterval: 10\n"
+ + " },\n"
+ + " }\n"
+ + " },\n"
+ + " storage: {\n"
+ + " engines: {\n"
+ + " aipersist: {\n"
+ + " checkpoint: {\n"
+ + " interval: " + AIPERSIST_CHECKPOINT_INTERVAL_MS + "\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " clientConnector: { port:{} }, \n"
+ + " rest.port: {},\n"
+ + " failureHandler.dumpThreadsOnFailure: false\n"
+ + "}";
+
+ @Override
+ protected int initialNodes() {
+ return 3;
+ }
+
+ @Override
+ protected String getNodeBootstrapConfigTemplate() {
+ return
FAST_FAILURE_DETECTION_AND_FAST_CHECKPOINT_NODE_BOOTSTRAP_CFG_TEMPLATE;
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ // Configurations to short the catalog compaction time.
+ String clusterConfiguration = format(
+ "ignite {\n"
+ + "gc: {lowWatermark: {dataAvailabilityTime: {},
updateInterval: {}}},\n"
+ + "}",
+ LW_UPDATE_TIME_MS * 2, LW_UPDATE_TIME_MS
+ );
+
+ builder.clusterConfiguration(clusterConfiguration);
+ }
+
+ @Test
+ void
testHaRecoveryOnZoneTimersRestoreAfterCatalogCompactionAndNodeRestart() throws
InterruptedException {
+ IgniteImpl node = igniteImpl(0);
+
+ changePartitionDistributionTimeout(node, 10);
+
+ createHaZoneWithTable();
+
+ assertRecoveryKeyIsEmpty(node);
+
+ // Await for catalog compaction
+ expectEarliestCatalogVersionGreaterThanZero();
+
+ stopNodes(2, 1, 0);
+
+ IgniteImpl node1 = unwrapIgniteImpl(startNode(0));
+
+ waitAndAssertRecoveryKeyIsNotEmpty(node1, 30_000);
+
+ assertRecoveryRequestForHaZoneTable(node1);
+ assertRecoveryRequestWasOnlyOne(node1);
+
+ waitAndAssertStableAssignmentsOfPartitionEqualTo(node1, HA_TABLE_NAME,
PARTITION_IDS, Set.of(node1.name()));
+ }
+
+ private void expectEarliestCatalogVersionGreaterThanZero() {
+ Awaitility.await().timeout(CATALOG_COMPACTION_AWAIT_INTERVAL_MS,
TimeUnit.MILLISECONDS).untilAsserted(() -> {
+ for (var node : runningNodes().collect(Collectors.toList())) {
+ IgniteImpl ignite = unwrapIgniteImpl(node);
+ CatalogManagerImpl catalogManager = ((CatalogManagerImpl)
ignite.catalogManager());
+
+ assertThat("The earliest catalog version does not match. ",
+ catalogManager.earliestCatalogVersion(),
greaterThan(0));
+ }
+ });
+ }
+}