This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b4a0465ad22 IGNITE-26525 Reuse metric values on zone renaming (#6737)
b4a0465ad22 is described below
commit b4a0465ad22886bdaf10233f9865f7f82b1b8d2e
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Oct 10 15:52:26 2025 +0300
IGNITE-26525 Reuse metric values on zone renaming (#6737)
---
.../internal/rebalance/ItRebalanceMetricsTest.java | 70 ++++++++++++++--
.../distributionzones/DistributionZoneManager.java | 79 ++++++++++--------
.../distributionzones/ZoneMetricSource.java | 94 ++++++++++++++++++----
.../internal/metrics/AbstractMetricSource.java | 15 ++++
4 files changed, 201 insertions(+), 57 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
index d896fc0f2c0..c268cc3a707 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceMetricsTest.java
@@ -28,6 +28,7 @@ import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpd
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -83,7 +84,7 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
.count();
// local and total unrebalanced partitions on node 0
- checkRebalanceMetrics(node0, 0, 0);
+ checkRebalanceMetrics(node0, ZONE_NAME, 0, 0);
AtomicBoolean expected0 = new AtomicBoolean();
AtomicBoolean expected1 = new AtomicBoolean();
@@ -148,6 +149,33 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
assertThat(res, is(true));
}
+ @Test
+ void testZoneRenaming() {
+ // Create a zone with 7 partitions, 1 replica, and auto scale up set
to Integer.MAX_VALUE.
+ createZone(ZONE_NAME, 7, 1, Integer.MAX_VALUE);
+
+ MetricSet zoneMetric0 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), ZONE_NAME);
+ assertThat(zoneMetric0, is(notNullValue()));
+
+ IntMetric local0 =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+ IntMetric total0 =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+ assertThat(local0, is(notNullValue()));
+ assertThat(total0, is(notNullValue()));
+
+ renameZone(ZONE_NAME, "NEW_" + ZONE_NAME);
+
+ MetricSet zoneMetric1 =
zoneMetricSet(unwrapIgniteImpl(cluster.node(0)), "NEW_" + ZONE_NAME);
+ assertThat(zoneMetric1, is(notNullValue()));
+
+ IntMetric local1 =
zoneMetric0.get(LOCAL_UNREBALANCED_PARTITIONS_COUNT);
+ IntMetric total1 =
zoneMetric0.get(TOTAL_UNREBALANCED_PARTITIONS_COUNT);
+ assertThat(local1, is(notNullValue()));
+ assertThat(total1, is(notNullValue()));
+
+ assertThat(local0.value(), equalTo(local1.value()));
+ assertThat(total0.value(), equalTo(total1.value()));
+ }
+
/**
* Creates a distribution zone with the specified parameters.
*
@@ -157,14 +185,28 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
* @param scaleUp Auto scale up value.
*/
private void createZone(String zoneName, int partitions, int replicas, int
scaleUp) {
- String sql1 = String.format("create zone %s "
+ String sql = String.format("create zone %s "
+ "(partitions %d, replicas %d, "
+ "auto scale up %d, "
+ "auto scale down 0) "
+ "storage profiles ['%s']", zoneName, partitions, replicas,
scaleUp, DEFAULT_STORAGE_PROFILE);
cluster.doInSession(0, session -> {
- executeUpdate(sql1, session);
+ executeUpdate(sql, session);
+ });
+ }
+
+ /**
+ * Renames a distribution zone.
+ *
+ * @param oldName Current name of the zone.
+ * @param newName New name of the zone.
+ */
+ private void renameZone(String oldName, String newName) {
+ String sql = String.format("alter zone %s rename to %s", oldName,
newName);
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(sql, session);
});
}
@@ -172,11 +214,17 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
* Checks the rebalance metrics for the given Ignite instance.
*
* @param ignite Ignite instance.
+ * @param zoneName Name of the zone.
* @param localUnrebalanced Expected number of local unrebalanced
partitions.
* @param totalUnrebalanced Expected total number of unrebalanced
partitions.
*/
- private static void checkRebalanceMetrics(IgniteImpl ignite, int
localUnrebalanced, int totalUnrebalanced) {
- MetricSet metrics = zoneMetricSet(ignite);
+ private static void checkRebalanceMetrics(
+ IgniteImpl ignite,
+ String zoneName,
+ int localUnrebalanced,
+ int totalUnrebalanced
+ ) {
+ MetricSet metrics = zoneMetricSet(ignite, zoneName);
assertThat(metrics, is(notNullValue()));
@@ -191,10 +239,20 @@ public class ItRebalanceMetricsTest extends
ClusterPerTestIntegrationTest {
* @return MetricSet for the zone.
*/
private static MetricSet zoneMetricSet(IgniteImpl ignite) {
+ return zoneMetricSet(ignite, ZONE_NAME);
+ }
+
+ /**
+ * Returns the metric set for the given {@code zone}.
+ *
+ * @param ignite Ignite instance.
+ * @return MetricSet for the zone.
+ */
+ private static MetricSet zoneMetricSet(IgniteImpl ignite, String zone) {
return ignite
.metricManager()
.metricSnapshot()
.metrics()
- .get(ZoneMetricSource.SOURCE_NAME + '.' + ZONE_NAME);
+ .get(ZoneMetricSource.sourceName(zone));
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index baa9fb61976..47f4949fe25 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -446,15 +446,7 @@ public class DistributionZoneManager extends
return dataNodesManager
.onZoneCreate(zone.id(), timestamp, filteredDataNodes)
- .thenRun(() -> {
- try {
- registerMetricSource(zone);
- } catch (Exception e) {
- // This is not a critical error, so there is no need
to stop node if we failed to register a metric source.
- // So, just log the error.
- LOG.error("Failed to register a new zone metric source
[zoneDescriptor={}]", e, zone);
- }
- });
+ .thenRun(() -> registerMetricSource(zone));
}
/**
@@ -795,12 +787,47 @@ public class DistributionZoneManager extends
* @param zone Zone descriptor.
*/
private void registerMetricSource(CatalogZoneDescriptor zone) {
- ZoneMetricSource source = new ZoneMetricSource(metaStorageManager,
localNodeName, zone);
+ registerMetricSource(zone, null);
+ }
+
+ /**
+ * Registers metric source for the specified zone.
+ *
+ * @param zone Zone descriptor.
+ * @param copyFrom Source to copy metrics from.
+ */
+ private void registerMetricSource(CatalogZoneDescriptor zone, @Nullable
ZoneMetricSource copyFrom) {
+ try {
+ ZoneMetricSource source = (copyFrom == null)
+ ? new ZoneMetricSource(metaStorageManager, localNodeName,
zone)
+ : new ZoneMetricSource(metaStorageManager, localNodeName,
zone, copyFrom);
+
+ zoneMetricSources.put(zone.id(), source);
+
+ metricManager.registerSource(source);
+ metricManager.enable(source);
+ } catch (Exception e) {
+ LOG.error("Failed to register zone metric source [zoneName={},
zoneId={}]", e, zone.name(), zone.id());
+ }
+ }
+
+ /**
+ * Unregisters metric source for the specified zone.
+ *
+ * @param zoneId Zone identifier.
+ */
+ private void unregisterMetricSource(int zoneId) {
+ ZoneMetricSource source = zoneMetricSources.remove(zoneId);
- zoneMetricSources.put(zone.id(), source);
+ if (source == null) {
+ return;
+ }
- metricManager.registerSource(source);
- metricManager.enable(source);
+ try {
+ metricManager.unregisterSource(source);
+ } catch (Exception e) {
+ LOG.error("Failed to unregister zone metric source [zoneName={},
zoneId={}]", e, source.zoneName(), zoneId);
+ }
}
/**
@@ -837,14 +864,7 @@ public class DistributionZoneManager extends
}
private CompletableFuture<?> onDropZoneBusy(DropZoneEventParameters
parameters) {
- try {
- ZoneMetricSource source =
zoneMetricSources.remove(parameters.zoneId());
- if (source != null) {
- metricManager.unregisterSource(source);
- }
- } catch (Exception e) {
- LOG.error("Failed to unregister zone metric source
[dropZoneEvent={}]", e, parameters);
- }
+ unregisterMetricSource(parameters.zoneId());
long causalityToken = parameters.causalityToken();
@@ -876,21 +896,10 @@ public class DistributionZoneManager extends
@Override
protected CompletableFuture<Void>
onNameUpdate(AlterZoneEventParameters parameters, String oldName) {
return inBusyLock(busyLock, () -> {
- try {
- CatalogZoneDescriptor zoneDescriptor =
parameters.zoneDescriptor();
+ ZoneMetricSource oldSource =
zoneMetricSources.get(parameters.zoneDescriptor().id());
- // Update metric source name.
- ZoneMetricSource source =
zoneMetricSources.remove(zoneDescriptor.id());
- if (source != null) {
- metricManager.unregisterSource(source);
- }
-
- registerMetricSource(parameters.zoneDescriptor());
- } catch (Exception e) {
- // This is not a critical error, so there is no need to
stop node if we failed to register a metric source.
- // So, just log the error.
- LOG.error("Failed to update zone metric set
[alterZoneEvent={}]", e, parameters);
- }
+ unregisterMetricSource(parameters.zoneDescriptor().id());
+ registerMetricSource(parameters.zoneDescriptor(), oldSource);
return nullCompletedFuture();
});
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/ZoneMetricSource.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/ZoneMetricSource.java
index e7a15019f06..0239bb4448e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/ZoneMetricSource.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/ZoneMetricSource.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.jetbrains.annotations.Nullable;
/**
* Distribution metric source for a specific zone.
@@ -49,8 +50,14 @@ public class ZoneMetricSource extends
AbstractMetricSource<ZoneMetricSource.Hold
/** Node name, aka consistent identifier. */
private final String nodeName;
- /** Zone descriptor. */
- public final CatalogZoneDescriptor zoneDescriptor;
+ /** Zone identifier. */
+ private final int zoneId;
+
+ /** Number of partitions in the zone. */
+ private final int partitions;
+
+ /** Zone name. */
+ private final String zoneName;
/**
* Creates a new zone metric source for a specific zone.
@@ -60,16 +67,65 @@ public class ZoneMetricSource extends
AbstractMetricSource<ZoneMetricSource.Hold
* @param zoneDescriptor Zone descriptor.
*/
public ZoneMetricSource(MetaStorageManager metaStorageManager, String
consistentId, CatalogZoneDescriptor zoneDescriptor) {
- super(SOURCE_NAME + '.' + zoneDescriptor.name(), "Distribution zone
metrics.", "zones");
+ super(sourceName(zoneDescriptor.name()), "Distribution zone metrics.",
"zones");
+
+ this.nodeName = consistentId;
+ this.metaStorageManager = metaStorageManager;
+ this.zoneId = zoneDescriptor.id();
+ this.partitions = zoneDescriptor.partitions();
+ this.zoneName = zoneDescriptor.name();
+ }
+
+ /**
+ * Creates a new zone metric source for a specific zone.
+ *
+ * @param metaStorageManager Meta Storage manager.
+ * @param consistentId Name of the node.
+ * @param zoneDescriptor Zone descriptor.
+ * @param source Source to copy metrics from.
+ */
+ public ZoneMetricSource(
+ MetaStorageManager metaStorageManager,
+ String consistentId,
+ CatalogZoneDescriptor zoneDescriptor,
+ ZoneMetricSource source
+ ) {
+ super(sourceName(zoneDescriptor.name()), "Distribution zone metrics.",
"zones", Holder.copyFrom(source));
+
+ assert zoneDescriptor.id() == source.zoneId :
+ "Zone ID mismatch [expected=" + zoneDescriptor.id() + ",
actual=" + source.zoneId + ']';
+ assert zoneDescriptor.partitions() == source.partitions :
+ "Partitions count mismatch [expected=" +
zoneDescriptor.partitions() + ", actual=" + source.partitions + ']';
this.nodeName = consistentId;
- this.zoneDescriptor = zoneDescriptor;
this.metaStorageManager = metaStorageManager;
+ this.zoneId = zoneDescriptor.id();
+ this.partitions = zoneDescriptor.partitions();
+ this.zoneName = zoneDescriptor.name();
+ }
+
+ /**
+ * Returns a metric source name for the given distribution zone.
+ *
+ * @param zoneName Zone name.
+ * @return Source name.
+ */
+ public static String sourceName(String zoneName) {
+ return SOURCE_NAME + '.' + zoneName;
}
@Override
protected Holder createHolder() {
- return new Holder(this);
+ return new Holder(zoneId, partitions, metaStorageManager, nodeName);
+ }
+
+ /**
+ * Returns the name of the zone.
+ *
+ * @return Zone name.
+ */
+ public String zoneName() {
+ return zoneName;
}
/** Holder. */
@@ -77,29 +133,35 @@ public class ZoneMetricSource extends
AbstractMetricSource<ZoneMetricSource.Hold
/** List of actual metrics. */
private final List<Metric> metrics;
- Holder(ZoneMetricSource source) {
+ static @Nullable Holder copyFrom(ZoneMetricSource source) {
+ // All metrics are gauge and must relate to the same zone.
+ // So, we can safely reuse the existing holder.
+ return source.holder();
+ }
+
+ Holder(int zoneId, int partitions, MetaStorageManager
metaStorageManager, String nodeName) {
var localUnrebalancedPartitionsCount = new IntGauge(
LOCAL_UNREBALANCED_PARTITIONS_COUNT,
"The number of partitions that should be moved to this
node.",
() -> {
int unrebalancedParts = 0;
- for (int i = 0; i <
source.zoneDescriptor.partitions(); ++i) {
- ZonePartitionId zonePartitionId = new
ZonePartitionId(source.zoneDescriptor.id(), i);
+ for (int i = 0; i < partitions; ++i) {
+ ZonePartitionId zonePartitionId = new
ZonePartitionId(zoneId, i);
- Entry pendingEntry =
source.metaStorageManager.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
+ Entry pendingEntry =
metaStorageManager.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
AssignmentsQueue pendingAssignmentsQueue =
AssignmentsQueue.fromBytes(pendingEntry.value());
if (pendingAssignmentsQueue != null) {
- Entry stableEntry =
source.metaStorageManager.getLocally(stablePartAssignmentsKey(zonePartitionId));
+ Entry stableEntry =
metaStorageManager.getLocally(stablePartAssignmentsKey(zonePartitionId));
Assignments stableAssignments =
stableEntry.value() == null
? Assignments.EMPTY
:
Assignments.fromBytes(stableEntry.value());
Assignments targetAssignments =
pendingAssignmentsQueue.peekLast();
- boolean stable =
presentInAssignments(stableAssignments, source.nodeName);
- boolean pending =
presentInAssignments(targetAssignments, source.nodeName);
+ boolean stable =
presentInAssignments(stableAssignments, nodeName);
+ boolean pending =
presentInAssignments(targetAssignments, nodeName);
if (!stable && pending) {
unrebalancedParts += 1;
@@ -117,14 +179,14 @@ public class ZoneMetricSource extends
AbstractMetricSource<ZoneMetricSource.Hold
() -> {
int unrebalancedParts = 0;
- for (int i = 0; i <
source.zoneDescriptor.partitions(); ++i) {
- ZonePartitionId zonePartitionId = new
ZonePartitionId(source.zoneDescriptor.id(), i);
+ for (int i = 0; i < partitions; ++i) {
+ ZonePartitionId zonePartitionId = new
ZonePartitionId(zoneId, i);
- Entry pendingEntry =
source.metaStorageManager.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
+ Entry pendingEntry =
metaStorageManager.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
AssignmentsQueue pendingAssignmentsQueue =
AssignmentsQueue.fromBytes(pendingEntry.value());
if (pendingAssignmentsQueue != null) {
- Entry stableEntry =
source.metaStorageManager.getLocally(stablePartAssignmentsKey(zonePartitionId));
+ Entry stableEntry =
metaStorageManager.getLocally(stablePartAssignmentsKey(zonePartitionId));
Assignments stableAssignments =
stableEntry.value() == null
? Assignments.EMPTY
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
index e18e9d4aebc..f3e06b9e412 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
@@ -77,6 +77,21 @@ public abstract class AbstractMetricSource<T extends
AbstractMetricSource.Holder
this.group = group;
}
+ /**
+ * Base constructor for all metric source implementations.
+ *
+ * @param name Metric source name.
+ * @param description Description.
+ * @param group Optional group name.
+ * @param holder Metric instances holder.
+ */
+ protected AbstractMetricSource(String name, @Nullable String description,
@Nullable String group, @Nullable T holder) {
+ this.name = name;
+ this.description = description;
+ this.group = group;
+ this.holder = holder;
+ }
+
@Override
public final String name() {
return name;