This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ccafaca75f3 IGNITE-26640 Fix ItRebalanceMetricsTest (#6749)
ccafaca75f3 is described below
commit ccafaca75f3b33e5f86a8844b13bc668271cf890
Author: Slava Koptilin <[email protected]>
AuthorDate: Mon Oct 13 17:42:12 2025 +0300
IGNITE-26640 Fix ItRebalanceMetricsTest (#6749)
---
.../internal/rebalance/ItRebalanceMetricsTest.java | 194 +++++++++++++--------
1 file changed, 121 insertions(+), 73 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
index c268cc3a707..367839753cc 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
@@ -28,21 +28,21 @@ import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpd
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.distributionzones.ZoneMetricSource;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metrics.IntMetric;
import org.apache.ignite.internal.metrics.MetricSet;
@@ -56,6 +56,44 @@ import org.junit.jupiter.api.condition.EnabledIf;
@EnabledIf("org.apache.ignite.internal.lang.IgniteSystemProperties#colocationEnabled")
public class ItRebalanceMetricsTest extends ClusterPerTestIntegrationTest {
private static final String ZONE_NAME = "TEST_ZONE";
+ private static final String ZONE_NAME_TO_RENAME = "TEST_ZONE_TO_RENAME";
+
+ static class PendingAssignmentsWatchListener implements WatchListener {
+ private final AtomicBoolean skipEventProcessing;
+ private final CountDownLatch latch;
+ private final String pendingKey;
+
+ PendingAssignmentsWatchListener(String pendingKey, CountDownLatch
latch, AtomicBoolean skipEventProcessing) {
+ this.pendingKey = pendingKey;
+ this.latch = latch;
+ this.skipEventProcessing = skipEventProcessing;
+ }
+
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ if (skipEventProcessing.get()) {
+ return nullCompletedFuture();
+ }
+
+ Entry entry = event.entryEvent().newEntry();
+
+ if (entry.value() == null) {
+ return nullCompletedFuture();
+ }
+
+ var eventKey = new String(entry.key());
+
+ if (eventKey.startsWith(pendingKey)) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return nullCompletedFuture();
+ }
+ }
@Override
protected int initialNodes() {
@@ -64,116 +102,92 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
@Test
void testRebalanceMetrics() throws Exception {
- // Create a zone with 7 partitions, 1 replica, and auto scale up set
to Integer.MAX_VALUE.
- createZone(ZONE_NAME, 7, 1, Integer.MAX_VALUE);
+ int partitionCount = 7;
+ int replicaCount = 1;
+ int scaleUpTimeout = Integer.MAX_VALUE;
+
+ createZone(ZONE_NAME, partitionCount, replicaCount, scaleUpTimeout);
IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
int zoneId = getZoneIdStrict(node0.catalogManager(), ZONE_NAME,
node0.clock().nowLong());
- // Calculate target assignments for 7 partitions and 1 replica across
2 nodes.
+ // Calculate target assignments for the zone.
List<Set<Assignment>> targetAssignments = calculateAssignments(
List.of(cluster.nodeName(0), cluster.nodeName(1)),
- 7,
- 1,
+ partitionCount,
+ replicaCount,
1);
// Number of partitions that should be moved to the node 1.
- long partCnt = targetAssignments
+ int partitionCountToRebalance = (int) targetAssignments
.stream()
.filter(assignments ->
assignments.contains(Assignment.forPeer(cluster.nodeName(1))))
.count();
- // local and total unrebalanced partitions on node 0
- checkRebalanceMetrics(node0, ZONE_NAME, 0, 0);
-
- AtomicBoolean expected0 = new AtomicBoolean();
- AtomicBoolean expected1 = new AtomicBoolean();
-
- WatchListener listener = event -> {
- Entry entry = event.entryEvent().newEntry();
-
- if (entry.value() == null) {
- return nullCompletedFuture();
- }
-
- var partitionId = new String(entry.key());
-
- if
(partitionId.startsWith(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX)) {
- // let's check metrics:
- MetricSet zoneMetric0 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)));
- IntMetric local0 =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
- IntMetric total0 =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
-
- MetricSet zoneMetric1 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(1)));
- IntMetric local1 =
zoneMetric1.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
- IntMetric total1 =
zoneMetric1.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+ assertRebalanceMetrics(node0, ZONE_NAME, 0, 0);
- // Expected metric values:
- // node0: localUnrebalancedPartitions == 0,
totalUnrebalancedPartitions == partCnt
- // node1: localUnrebalancedPartitions == partCnt,
totalUnrebalancedPartitions == partCnt
- if (local0.value() == 0 && total0.value() == partCnt) {
- expected0.set(true);
- }
- if (local1.value() == partCnt && total1.value() == partCnt) {
- expected1.set(true);
- }
- }
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean skipEventProcessing = new AtomicBoolean(false);
+ String pendingAssignmentsKey =
ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX + zoneId + "_part_";
+ ByteArray pendingAssignmentsBytes = new
ByteArray(pendingAssignmentsKey);
- return nullCompletedFuture();
- };
+ WatchListener listener = new
PendingAssignmentsWatchListener(pendingAssignmentsKey, latch,
skipEventProcessing);
// Start a new node and register a watch listener for pending
assignments.
- Ignite node1 = startNode(1);
- MetaStorageManager metastorage =
unwrapIgniteImpl(node1).metaStorageManager();
+ IgniteImpl node1 = unwrapIgniteImpl(startNode(1));
- ByteArray key = new
ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX + zoneId +
"_part_");
- metastorage.registerPrefixWatch(key, listener);
+
node0.metaStorageManager().registerPrefixWatch(pendingAssignmentsBytes,
listener);
+
node1.metaStorageManager().registerPrefixWatch(pendingAssignmentsBytes,
listener);
// Set auto scale up timer to 0 in order to trigger rebalance
immediately.
cluster.doInSession(0, session -> {
executeUpdate(format("alter zone {} set auto scale up {}",
ZONE_NAME, 0), session);
});
- boolean res = waitForCondition(() -> expected0.get() &&
expected1.get(), 30_000, 200);
+ boolean res = waitForCondition(() -> checkRebalanceMetrics(node0,
ZONE_NAME, 0, partitionCountToRebalance)
+ && checkRebalanceMetrics(node1, ZONE_NAME,
partitionCountToRebalance, partitionCountToRebalance), 100, 30_000);
+
if (!res) {
- // Log metric values.
+ log.warn(">>>>> partitions to rebalance = " +
partitionCountToRebalance);
MetricSet zoneMetric0 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)));
zoneMetric0.iterator().forEachRemaining(metric ->
- log.warn(">>>>> metrics 0 [name=" + metric.name() + ",
value=" + metric.getValueAsString()));
+ log.warn(">>>>> metrics 0 [name=" + metric.name() + ",
value=" + metric.getValueAsString() + ']'));
MetricSet zoneMetric1 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(1)));
zoneMetric1.iterator().forEachRemaining(metric ->
- log.warn(">>>>> metrics 1 [name=" + metric.name() + ",
value=" + metric.getValueAsString()));
+ log.warn(">>>>> metrics 1 [name=" + metric.name() + ",
value=" + metric.getValueAsString() + ']'));
}
+ // Unblock rebalance.
+ skipEventProcessing.set(true);
+ latch.countDown();
+
+ assertThat(res, is(true));
+
+ res = waitForCondition(() -> checkRebalanceMetrics(node0, ZONE_NAME,
0, 0)
+ && checkRebalanceMetrics(node1, ZONE_NAME, 0, 0), 100, 30_000);
+
assertThat(res, is(true));
}
@Test
void testZoneRenaming() {
- // Create a zone with 7 partitions, 1 replica, and auto scale up set
to Integer.MAX_VALUE.
- createZone(ZONE_NAME, 7, 1, Integer.MAX_VALUE);
+ createZone(ZONE_NAME_TO_RENAME, 7, 1, Integer.MAX_VALUE);
- MetricSet zoneMetric0 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), ZONE_NAME);
+ MetricSet zoneMetric0 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), ZONE_NAME_TO_RENAME);
assertThat(zoneMetric0, is(notNullValue()));
- IntMetric local0 =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
- IntMetric total0 =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
- assertThat(local0, is(notNullValue()));
- assertThat(total0, is(notNullValue()));
-
- renameZone(ZONE_NAME, "NEW_" + ZONE_NAME);
+ IntMetric localUnrebalancedPartitionsMetric =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+ IntMetric totalUnrebalancedPartitionsMetric =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+ assertThat(localUnrebalancedPartitionsMetric, is(notNullValue()));
+ assertThat(totalUnrebalancedPartitionsMetric, is(notNullValue()));
- MetricSet zoneMetric1 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), "NEW_" + ZONE_NAME);
- assertThat(zoneMetric1, is(notNullValue()));
+ int local = localUnrebalancedPartitionsMetric.value();
+ int total = totalUnrebalancedPartitionsMetric.value();
- IntMetric local1 =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
- IntMetric total1 =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
- assertThat(local1, is(notNullValue()));
- assertThat(total1, is(notNullValue()));
+ renameZone(ZONE_NAME_TO_RENAME, "RENAMED_" + ZONE_NAME_TO_RENAME);
- assertThat(local0.value(), equalTo(local1.value()));
- assertThat(total0.value(), equalTo(total1.value()));
+ assertRebalanceMetrics(unwrapIgniteImpl(cluster.node(0)), "RENAMED_" +
ZONE_NAME_TO_RENAME, local, total);
}
/**
@@ -218,7 +232,7 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
* @param localUnrebalanced Expected number of local unrebalanced
partitions.
* @param totalUnrebalanced Expected total number of unrebalanced
partitions.
*/
- private static void checkRebalanceMetrics(
+ private static void assertRebalanceMetrics(
IgniteImpl ignite,
String zoneName,
int localUnrebalanced,
@@ -228,8 +242,42 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
assertThat(metrics, is(notNullValue()));
- assertThat(((IntMetric)
metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT)).value(),
is(localUnrebalanced));
- assertThat(((IntMetric)
metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT)).value(),
is(totalUnrebalanced));
+ IntMetric local = metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+ IntMetric total = metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+
+ assertThat(local, is(notNullValue()));
+ assertThat(total, is(notNullValue()));
+
+ assertThat(local.value(), is(localUnrebalanced));
+ assertThat(total.value(), is(totalUnrebalanced));
+ }
+
+ /**
+ * Returns {@code true} zone metrics are equal to the given parameters.
+ *
+ * @param ignite Ignite instance.
+ * @param zoneName Name of the zone.
+ * @param localUnrebalanced Expected number of local unrebalanced
partitions.
+ * @param totalUnrebalanced Expected total number of unrebalanced
partitions.
+ * @return {@code true} if metrics are equal to the given parameters.
+ */
+ private static boolean checkRebalanceMetrics(
+ IgniteImpl ignite,
+ String zoneName,
+ int localUnrebalanced,
+ int totalUnrebalanced
+ ) {
+ MetricSet metrics = zoneMetricSet(ignite, zoneName);
+
+ if (metrics == null) {
+ return false;
+ }
+
+ IntMetric local = metrics.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+ IntMetric total = metrics.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+
+ return local != null && total != null
+ && local.value() == localUnrebalanced && total.value() ==
totalUnrebalanced;
}
/**