This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ccafaca75f3 IGNITE-26640 Fix ItRebalanceMetricsTest (#6749)
ccafaca75f3 is described below

commit ccafaca75f3b33e5f86a8844b13bc668271cf890
Author: Slava Koptilin <[email protected]>
AuthorDate: Mon Oct 13 17:42:12 2025 +0300

    IGNITE-26640 Fix ItRebalanceMetricsTest (#6749)
---
 .../internal/rebalance/ItRebalanceMetricsTest.java | 194 +++++++++++++--------
 1 file changed, 121 insertions(+), 73 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
index c268cc3a707..367839753cc 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
@@ -28,21 +28,21 @@ import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpd
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.distributionzones.ZoneMetricSource;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metrics.IntMetric;
 import org.apache.ignite.internal.metrics.MetricSet;
@@ -56,6 +56,44 @@ import org.junit.jupiter.api.condition.EnabledIf;
 
@EnabledIf("org.apache.ignite.internal.lang.IgniteSystemProperties#colocationEnabled")
 public class ItRebalanceMetricsTest extends ClusterPerTestIntegrationTest {
     private static final String ZONE_NAME = "TEST_ZONE";
+    private static final String ZONE_NAME_TO_RENAME = "TEST_ZONE_TO_RENAME";
+
+    static class PendingAssignmentsWatchListener implements WatchListener {
+        private final AtomicBoolean skipEventProcessing;
+        private final CountDownLatch latch;
+        private final String pendingKey;
+
+        PendingAssignmentsWatchListener(String pendingKey, CountDownLatch 
latch, AtomicBoolean skipEventProcessing) {
+            this.pendingKey = pendingKey;
+            this.latch = latch;
+            this.skipEventProcessing = skipEventProcessing;
+        }
+
+        @Override
+        public CompletableFuture<Void> onUpdate(WatchEvent event) {
+            if (skipEventProcessing.get()) {
+                return nullCompletedFuture();
+            }
+
+            Entry entry = event.entryEvent().newEntry();
+
+            if (entry.value() == null) {
+                return nullCompletedFuture();
+            }
+
+            var eventKey = new String(entry.key());
+
+            if (eventKey.startsWith(pendingKey)) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return nullCompletedFuture();
+        }
+    }
 
     @Override
     protected int initialNodes() {
@@ -64,116 +102,92 @@ public class ItRebalanceMetricsTest extends 
ClusterPerTestIntegrationTest {
 
     @Test
     void testRebalanceMetrics() throws Exception {
-        // Create a zone with 7 partitions, 1 replica, and auto scale up set 
to Integer.MAX_VALUE.
-        createZone(ZONE_NAME, 7, 1, Integer.MAX_VALUE);
+        int partitionCount = 7;
+        int replicaCount = 1;
+        int scaleUpTimeout = Integer.MAX_VALUE;
+
+        createZone(ZONE_NAME, partitionCount, replicaCount, scaleUpTimeout);
 
         IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
         int zoneId = getZoneIdStrict(node0.catalogManager(), ZONE_NAME, 
node0.clock().nowLong());
 
-        // Calculate target assignments for 7 partitions and 1 replica across 
2 nodes.
+        // Calculate target assignments for the zone.
         List<Set<Assignment>> targetAssignments = calculateAssignments(
                 List.of(cluster.nodeName(0), cluster.nodeName(1)),
-                7,
-                1,
+                partitionCount,
+                replicaCount,
                 1);
 
         // Number of partitions that should be moved to the node 1.
-        long partCnt = targetAssignments
+        int partitionCountToRebalance = (int) targetAssignments
                 .stream()
                 .filter(assignments -> 
assignments.contains(Assignment.forPeer(cluster.nodeName(1))))
                 .count();
 
-        // local and total unrebalanced partitions on node 0
-        checkRebalanceMetrics(node0, ZONE_NAME, 0, 0);
-
-        AtomicBoolean expected0 = new AtomicBoolean();
-        AtomicBoolean expected1 = new AtomicBoolean();
-
-        WatchListener listener = event -> {
-            Entry entry = event.entryEvent().newEntry();
-
-            if (entry.value() == null) {
-                return nullCompletedFuture();
-            }
-
-            var partitionId = new String(entry.key());
-
-            if 
(partitionId.startsWith(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX)) {
-                // let's check metrics:
-                MetricSet zoneMetric0 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)));
-                IntMetric local0 = 
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
-                IntMetric total0 = 
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
-
-                MetricSet zoneMetric1 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(1)));
-                IntMetric local1 = 
zoneMetric1.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
-                IntMetric total1 = 
zoneMetric1.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+        assertRebalanceMetrics(node0, ZONE_NAME, 0, 0);
 
-                // Expected metric values:
-                // node0: localUnrebalancedPartitions == 0, 
totalUnrebalancedPartitions == partCnt
-                // node1: localUnrebalancedPartitions == partCnt, 
totalUnrebalancedPartitions == partCnt
-                if (local0.value() == 0 && total0.value() == partCnt) {
-                    expected0.set(true);
-                }
-                if (local1.value() == partCnt && total1.value() == partCnt) {
-                    expected1.set(true);
-                }
-            }
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean skipEventProcessing = new AtomicBoolean(false);
+        String pendingAssignmentsKey = 
ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX + zoneId + "_part_";
+        ByteArray pendingAssignmentsBytes = new 
ByteArray(pendingAssignmentsKey);
 
-            return nullCompletedFuture();
-        };
+        WatchListener listener = new 
PendingAssignmentsWatchListener(pendingAssignmentsKey, latch, 
skipEventProcessing);
 
         // Start a new node and register a watch listener for pending 
assignments.
-        Ignite node1 = startNode(1);
-        MetaStorageManager metastorage = 
unwrapIgniteImpl(node1).metaStorageManager();
+        IgniteImpl node1 = unwrapIgniteImpl(startNode(1));
 
-        ByteArray key = new 
ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX + zoneId + 
"_part_");
-        metastorage.registerPrefixWatch(key, listener);
+        
node0.metaStorageManager().registerPrefixWatch(pendingAssignmentsBytes, 
listener);
+        
node1.metaStorageManager().registerPrefixWatch(pendingAssignmentsBytes, 
listener);
 
         // Set auto scale up timer to 0 in order to trigger rebalance 
immediately.
         cluster.doInSession(0, session -> {
             executeUpdate(format("alter zone {} set auto scale up {}", 
ZONE_NAME, 0), session);
         });
 
-        boolean res = waitForCondition(() -> expected0.get() && 
expected1.get(), 30_000, 200);
+        boolean res = waitForCondition(() -> checkRebalanceMetrics(node0, 
ZONE_NAME, 0, partitionCountToRebalance)
+                && checkRebalanceMetrics(node1, ZONE_NAME, 
partitionCountToRebalance, partitionCountToRebalance), 100, 30_000);
+
         if (!res) {
-            // Log metric values.
+            log.warn(">>>>> partitions to rebalance = " + 
partitionCountToRebalance);
             MetricSet zoneMetric0 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)));
             zoneMetric0.iterator().forEachRemaining(metric ->
-                    log.warn(">>>>> metrics 0 [name=" + metric.name() + ", 
value=" + metric.getValueAsString()));
+                    log.warn(">>>>> metrics 0 [name=" + metric.name() + ", 
value=" + metric.getValueAsString() + ']'));
 
             MetricSet zoneMetric1 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(1)));
             zoneMetric1.iterator().forEachRemaining(metric ->
-                    log.warn(">>>>> metrics 1 [name=" + metric.name() + ", 
value=" + metric.getValueAsString()));
+                    log.warn(">>>>> metrics 1 [name=" + metric.name() + ", 
value=" + metric.getValueAsString() + ']'));
         }
 
+        // Unblock rebalance.
+        skipEventProcessing.set(true);
+        latch.countDown();
+
+        assertThat(res, is(true));
+
+        res = waitForCondition(() -> checkRebalanceMetrics(node0, ZONE_NAME, 
0, 0)
+                && checkRebalanceMetrics(node1, ZONE_NAME, 0, 0), 100, 30_000);
+
         assertThat(res, is(true));
     }
 
     @Test
     void testZoneRenaming() {
-        // Create a zone with 7 partitions, 1 replica, and auto scale up set 
to Integer.MAX_VALUE.
-        createZone(ZONE_NAME, 7, 1, Integer.MAX_VALUE);
+        createZone(ZONE_NAME_TO_RENAME, 7, 1, Integer.MAX_VALUE);
 
-        MetricSet zoneMetric0 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), ZONE_NAME);
+        MetricSet zoneMetric0 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), ZONE_NAME_TO_RENAME);
         assertThat(zoneMetric0, is(notNullValue()));
 
-        IntMetric local0 = 
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
-        IntMetric total0 = 
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
-        assertThat(local0, is(notNullValue()));
-        assertThat(total0, is(notNullValue()));
-
-        renameZone(ZONE_NAME, "NEW_" + ZONE_NAME);
+        IntMetric localUnrebalancedPartitionsMetric = 
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+        IntMetric totalUnrebalancedPartitionsMetric = 
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+        assertThat(localUnrebalancedPartitionsMetric, is(notNullValue()));
+        assertThat(totalUnrebalancedPartitionsMetric, is(notNullValue()));
 
-        MetricSet zoneMetric1 = 
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), "NEW_" + ZONE_NAME);
-        assertThat(zoneMetric1, is(notNullValue()));
+        int local = localUnrebalancedPartitionsMetric.value();
+        int total  = totalUnrebalancedPartitionsMetric.value();
 
-        IntMetric local1 = 
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
-        IntMetric total1 = 
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
-        assertThat(local1, is(notNullValue()));
-        assertThat(total1, is(notNullValue()));
+        renameZone(ZONE_NAME_TO_RENAME, "RENAMED_" + ZONE_NAME_TO_RENAME);
 
-        assertThat(local0.value(), equalTo(local1.value()));
-        assertThat(total0.value(), equalTo(total1.value()));
+        assertRebalanceMetrics(unwrapIgniteImpl(cluster.node(0)), "RENAMED_" + 
ZONE_NAME_TO_RENAME, local, total);
     }
 
     /**
@@ -218,7 +232,7 @@ public class ItRebalanceMetricsTest extends 
ClusterPerTestIntegrationTest {
      * @param localUnrebalanced Expected number of local unrebalanced 
partitions.
      * @param totalUnrebalanced Expected total number of unrebalanced 
partitions.
      */
-    private static void checkRebalanceMetrics(
+    private static void assertRebalanceMetrics(
             IgniteImpl ignite,
             String zoneName,
             int localUnrebalanced,
@@ -228,8 +242,42 @@ public class ItRebalanceMetricsTest extends 
ClusterPerTestIntegrationTest {
 
         assertThat(metrics, is(notNullValue()));
 
-        assertThat(((IntMetric) 
metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT)).value(), 
is(localUnrebalanced));
-        assertThat(((IntMetric) 
metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT)).value(), 
is(totalUnrebalanced));
+        IntMetric local = metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+        IntMetric total = metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+
+        assertThat(local, is(notNullValue()));
+        assertThat(total, is(notNullValue()));
+
+        assertThat(local.value(), is(localUnrebalanced));
+        assertThat(total.value(), is(totalUnrebalanced));
+    }
+
+    /**
+     * Returns {@code true} zone metrics are equal to the given parameters.
+     *
+     * @param ignite Ignite instance.
+     * @param zoneName Name of the zone.
+     * @param localUnrebalanced Expected number of local unrebalanced 
partitions.
+     * @param totalUnrebalanced Expected total number of unrebalanced 
partitions.
+     * @return {@code true} if metrics are equal to the given parameters.
+     */
+    private static boolean checkRebalanceMetrics(
+            IgniteImpl ignite,
+            String zoneName,
+            int localUnrebalanced,
+            int totalUnrebalanced
+    ) {
+        MetricSet metrics = zoneMetricSet(ignite, zoneName);
+
+        if (metrics == null) {
+            return false;
+        }
+
+        IntMetric local = metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+        IntMetric total = metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+
+        return local != null && total != null
+                && local.value() == localUnrebalanced && total.value() == 
totalUnrebalanced;
     }
 
     /**

Reply via email to