This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-3.0
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0 by this push:
     new 59c0160a8a IGNITE-24138 Fix highestRevision check for HA reset case 
(#5020)
59c0160a8a is described below

commit 59c0160a8aa7807ff59322e835f40380ef382558
Author: Kirill Gusakov <[email protected]>
AuthorDate: Mon Jan 13 20:58:16 2025 +0300

    IGNITE-24138 Fix highestRevision check for HA reset case (#5020)
    
    (cherry picked from commit 48b06e03db7eb2edb0001080ee74a211848dde38)
---
 .../distributionzones/DistributionZoneManager.java |  30 ++++-
 modules/table/build.gradle                         |   1 +
 .../ItHighAvailablePartitionsRecoveryTest.java     |  24 +---
 ...lablePartitionsRecoveryWithNodeRestartTest.java | 134 +++++++++++++++++++++
 4 files changed, 163 insertions(+), 26 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 1e6203b7bf..c7ddb55828 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -75,6 +75,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -278,6 +279,8 @@ public class DistributionZoneManager extends
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return inBusyLockAsync(busyLock, () -> {
+            partitionDistributionResetTimeoutConfiguration.init();
+
             registerCatalogEventListenersOnStartManagerBusy();
 
             logicalTopologyService.addEventListener(topologyEventListener);
@@ -301,8 +304,6 @@ public class DistributionZoneManager extends
             // fires CatalogManager's ZONE_CREATE event, and the state of 
DistributionZoneManager becomes consistent.
             int catalogVersion = catalogManager.latestCatalogVersion();
 
-            partitionDistributionResetTimeoutConfiguration.init();
-
             return allOf(
                     createOrRestoreZonesStates(recoveryRevision, 
catalogVersion),
                     
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision, 
catalogVersion)
@@ -394,6 +395,18 @@ public class DistributionZoneManager extends
             int partitionDistributionResetTimeoutSeconds,
             long causalityToken
     ) {
+        CompletableFuture<Revisions> recoveryFuture = 
metaStorageManager.recoveryFinishedFuture();
+
+        // At the moment of the first call to this method from configuration 
notifications,
+        // it is guaranteed that Meta Storage has been recovered.
+        assert recoveryFuture.isDone();
+
+        if (recoveryFuture.join().revision() >= causalityToken) {
+            // So, configuration already has the right value on configuration 
init
+            // and all timers started with the right configuration timeouts on 
recovery.
+            return;
+        }
+
         long updateTimestamp = timestampByRevision(causalityToken);
 
         if (updateTimestamp == -1) {
@@ -413,7 +426,7 @@ public class DistributionZoneManager extends
             ZoneState zoneState = zoneStateEntry.getValue();
 
             if (partitionDistributionResetTimeoutSeconds != 
INFINITE_TIMER_VALUE) {
-                Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+                Optional<Long> highestRevision = zoneState.highestRevision();
 
                 assert highestRevision.isEmpty() || causalityToken >= 
highestRevision.get() : IgniteStringFormatter.format(
                         "Expected causalityToken that is greater or equal to 
already seen meta storage events: highestRevision={}, "
@@ -1506,6 +1519,17 @@ public class DistributionZoneManager extends
                     .map(Map.Entry::getKey);
         }
 
+        /**
+         * Returns the highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()}.
+         *
+         * @return The highest revision which is presented in the {@link 
ZoneState#topologyAugmentationMap()}.
+         */
+        Optional<Long> highestRevision() {
+            return topologyAugmentationMap().keySet()
+                    .stream()
+                    .max(Comparator.naturalOrder());
+        }
+
         @TestOnly
         public synchronized ScheduledFuture<?> scaleUpTask() {
             return scaleUpTask;
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index b0e1aa4571..28d1f9e6de 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -162,6 +162,7 @@ dependencies {
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.calcite.core
     integrationTestImplementation libs.rocksdb.jni
+    integrationTestImplementation libs.awaitility
 }
 
 description = 'ignite-table'
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index 0616910e58..f667197268 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -46,7 +46,7 @@ import 
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.junit.jupiter.api.Test;
 
 /** Test for the HA zones recovery. */
-public class ItHighAvailablePartitionsRecoveryTest  extends 
AbstractHighAvailablePartitionsRecoveryTest {
+public class ItHighAvailablePartitionsRecoveryTest extends 
AbstractHighAvailablePartitionsRecoveryTest {
     @Override
     protected int initialNodes() {
         return 3;
@@ -160,28 +160,6 @@ public class ItHighAvailablePartitionsRecoveryTest  
extends AbstractHighAvailabl
         waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, 
PARTITION_IDS, Set.of(node.name()));
     }
 
-    @Test
-    void testHaRecoveryOnZoneTimersRestoreAfterNodeRestart() throws 
InterruptedException {
-        createHaZoneWithTable();
-
-        IgniteImpl node = igniteImpl(0);
-
-        changePartitionDistributionTimeout(node, 10);
-
-        assertRecoveryKeyIsEmpty(node);
-
-        stopNodes(2, 1, 0);
-
-        IgniteImpl node1 = unwrapIgniteImpl(startNode(0));
-
-        waitAndAssertRecoveryKeyIsNotEmpty(node1, 30_000);
-
-        assertRecoveryRequestForHaZoneTable(node1);
-        assertRecoveryRequestWasOnlyOne(node1);
-
-        waitAndAssertStableAssignmentsOfPartitionEqualTo(node1, HA_TABLE_NAME, 
PARTITION_IDS, Set.of(node1.name()));
-    }
-
     @Test
     void testScaleUpAfterReset() throws InterruptedException {
         createHaZoneWithTable();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
new file mode 100644
index 0000000000..3a5ad53f82
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryWithNodeRestartTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+/** Test for the HA zones recovery with node restarts. */
+class ItHighAvailablePartitionsRecoveryWithNodeRestartTest extends 
AbstractHighAvailablePartitionsRecoveryTest {
+    /** How often we update the low water mark. */
+    private static final long LW_UPDATE_TIME_MS = 
TimeUnit.MILLISECONDS.toMillis(500);
+
+    /** It should be less than {@link #LW_UPDATE_TIME_MS} for the test to 
work. */
+    private static final long AIPERSIST_CHECKPOINT_INTERVAL_MS = 
LW_UPDATE_TIME_MS / 2;
+
+    /** Should be greater than 2 x {@link #LW_UPDATE_TIME_MS} and long enough 
to await for the catalog compaction finish. */
+    private static final long CATALOG_COMPACTION_AWAIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(10);
+
+    private static final String 
FAST_FAILURE_DETECTION_AND_FAST_CHECKPOINT_NODE_BOOTSTRAP_CFG_TEMPLATE = 
"ignite {\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    },\n"
+            + "    membership: {\n"
+            + "      membershipSyncInterval: 1000,\n"
+            + "      failurePingInterval: 500,\n"
+            + "      scaleCube: {\n"
+            + "        membershipSuspicionMultiplier: 1,\n"
+            + "        failurePingRequestMembers: 1,\n"
+            + "        gossipInterval: 10\n"
+            + "      },\n"
+            + "    }\n"
+            + "  },\n"
+            + "  storage: {\n"
+            + "    engines: {\n"
+            + "      aipersist: {\n"
+            + "        checkpoint: {\n"
+            + "          interval: " + AIPERSIST_CHECKPOINT_INTERVAL_MS + "\n"
+            + "        }\n"
+            + "      }\n"
+            + "    }\n"
+            + "  }\n"
+            + "  clientConnector: { port:{} }, \n"
+            + "  rest.port: {},\n"
+            + "  failureHandler.dumpThreadsOnFailure: false\n"
+            + "}";
+
+    @Override
+    protected int initialNodes() {
+        return 3;
+    }
+
+    @Override
+    protected String getNodeBootstrapConfigTemplate() {
+        return 
FAST_FAILURE_DETECTION_AND_FAST_CHECKPOINT_NODE_BOOTSTRAP_CFG_TEMPLATE;
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        // Configurations to short the catalog compaction time.
+        String clusterConfiguration = format(
+                "ignite {\n"
+                        + "gc: {lowWatermark: {dataAvailabilityTime: {}, 
updateInterval: {}}},\n"
+                        + "}",
+                LW_UPDATE_TIME_MS * 2, LW_UPDATE_TIME_MS
+        );
+
+        builder.clusterConfiguration(clusterConfiguration);
+    }
+
+    @Test
+    void 
testHaRecoveryOnZoneTimersRestoreAfterCatalogCompactionAndNodeRestart() throws 
InterruptedException {
+        IgniteImpl node = igniteImpl(0);
+
+        changePartitionDistributionTimeout(node, 10);
+
+        createHaZoneWithTable();
+
+        assertRecoveryKeyIsEmpty(node);
+
+        // Await for catalog compaction
+        expectEarliestCatalogVersionGreaterThanZero();
+
+        stopNodes(2, 1, 0);
+
+        IgniteImpl node1 = unwrapIgniteImpl(startNode(0));
+
+        waitAndAssertRecoveryKeyIsNotEmpty(node1, 30_000);
+
+        assertRecoveryRequestForHaZoneTable(node1);
+        assertRecoveryRequestWasOnlyOne(node1);
+
+        waitAndAssertStableAssignmentsOfPartitionEqualTo(node1, HA_TABLE_NAME, 
PARTITION_IDS, Set.of(node1.name()));
+    }
+
+    private void expectEarliestCatalogVersionGreaterThanZero() {
+        Awaitility.await().timeout(CATALOG_COMPACTION_AWAIT_INTERVAL_MS, 
TimeUnit.MILLISECONDS).untilAsserted(() -> {
+            for (var node : runningNodes().collect(Collectors.toList())) {
+                IgniteImpl ignite = unwrapIgniteImpl(node);
+                CatalogManagerImpl catalogManager = ((CatalogManagerImpl) 
ignite.catalogManager());
+
+                assertThat("The earliest catalog version does not match. ",
+                        catalogManager.earliestCatalogVersion(), 
greaterThan(0));
+            }
+        });
+    }
+}

Reply via email to