This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8977c85e4f Remove all table metrics when a table is deleted (#12403)
8977c85e4f is described below
commit 8977c85e4f8e3a8529f875ec32f0042c52938faf
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Feb 26 18:44:01 2024 +0100
Remove all table metrics when a table is deleted (#12403)
---
.../pinot/common/metrics/AbstractMetrics.java | 15 ++++++
.../controller/helix/SegmentStatusChecker.java | 54 ++++++++--------------
.../controller/helix/SegmentStatusCheckerTest.java | 24 +++++-----
.../manager/realtime/IngestionDelayTracker.java | 2 +
.../ControllerPeriodicTasksIntegrationTest.java | 3 +-
.../helix/SegmentMessageHandlerFactory.java | 20 ++++++++
6 files changed, 69 insertions(+), 49 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 456d6ecb14..ee13493e15 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -133,6 +133,11 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
}
}
+ public void removePhaseTiming(String tableName, QP phase) {
+ String fullTimerName = _metricPrefix + getTableName(tableName) + "." +
phase.getQueryPhaseName();
+ removeTimer(fullTimerName);
+ }
+
public void addPhaseTiming(String tableName, QP phase, long duration,
TimeUnit timeUnit) {
String fullTimerName = _metricPrefix + getTableName(tableName) + "." +
phase.getQueryPhaseName();
addValueToTimer(fullTimerName, duration, timeUnit);
@@ -208,6 +213,16 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
}
}
+ public void removeTimer(final String fullTimerName) {
+ PinotMetricUtils
+ .removeMetric(_metricsRegistry,
PinotMetricUtils.makePinotMetricName(_clazz, fullTimerName));
+ }
+
+ public void removeTableTimer(final String tableName, final T timer) {
+ final String fullTimerName = _metricPrefix + getTableName(tableName) + "."
+ timer.getTimerName();
+ removeTimer(fullTimerName);
+ }
+
/**
* Logs a value to a meter.
*
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 5b543e4319..dd598b2b21 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -38,7 +38,9 @@ import
org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -97,8 +99,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
@Override
protected void setUpTask() {
- LOGGER.info("Initializing table metrics for all the tables.");
- setStatusToDefault();
}
@Override
@@ -123,7 +123,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
} catch (Exception e) {
LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
// Remove the metric for this table
- resetTableMetrics(tableNameWithType);
+ removeMetricsForTable(tableNameWithType);
}
context._processedTables.add(tableNameWithType);
}
@@ -187,7 +187,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
if (idealState == null) {
LOGGER.warn("Table {} has null ideal state. Skipping segment status
checks", tableNameWithType);
- resetTableMetrics(tableNameWithType);
+ removeMetricsForTable(tableNameWithType);
return;
}
@@ -195,7 +195,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
if (context._logDisabledTables) {
LOGGER.warn("Table {} is disabled. Skipping segment status checks",
tableNameWithType);
}
- resetTableMetrics(tableNameWithType);
+ removeMetricsForTable(tableNameWithType);
context._disabledTables.add(tableNameWithType);
return;
}
@@ -354,43 +354,27 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
private void removeMetricsForTable(String tableNameWithType) {
LOGGER.info("Removing metrics from {} given it is not a table known by
Helix", tableNameWithType);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED);
-
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.TABLE_DISABLED);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.TABLE_CONSUMPTION_PAUSED);
- _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS);
- }
-
- private void setStatusToDefault() {
- List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
+ for (ControllerGauge metric : ControllerGauge.values()) {
+ if (!metric.isGlobal()) {
+ _controllerMetrics.removeTableGauge(tableNameWithType, metric);
+ }
+ }
- for (String tableName : allTableNames) {
- resetTableMetrics(tableName);
+ for (ControllerMeter metric : ControllerMeter.values()) {
+ if (!metric.isGlobal()) {
+ _controllerMetrics.removeTableMeter(tableNameWithType, metric);
+ }
}
- }
- private void resetTableMetrics(String tableName) {
- _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS, Long.MIN_VALUE);
- _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS, Long.MIN_VALUE);
- _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
- _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
- Long.MIN_VALUE);
- _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
+ for (ControllerTimer metric : ControllerTimer.values()) {
+ if (!metric.isGlobal()) {
+ _controllerMetrics.removeTableTimer(tableNameWithType, metric);
+ }
+ }
}
@Override
public void cleanUpTask() {
- LOGGER.info("Resetting table metrics for all the tables.");
- setStatusToDefault();
}
@VisibleForTesting
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 99991b3d4c..c7974b9d0b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -452,14 +452,14 @@ public class SegmentStatusCheckerTest {
_segmentStatusChecker.setTableSizeReader(_tableSizeReader);
_segmentStatusChecker.start();
_segmentStatusChecker.run();
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), Long.MIN_VALUE);
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.NUMBER_OF_REPLICAS), Long.MIN_VALUE);
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.PERCENT_OF_REPLICAS), Long.MIN_VALUE);
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.SEGMENTS_IN_ERROR_STATE));
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.NUMBER_OF_REPLICAS));
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.PERCENT_OF_REPLICAS));
Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
ControllerGauge.TABLE_COMPRESSED_SIZE));
}
@@ -820,10 +820,10 @@ public class SegmentStatusCheckerTest {
_segmentStatusChecker.start();
_segmentStatusChecker.run();
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
-
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.SEGMENTS_IN_ERROR_STATE));
+ Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
tableName,
+ ControllerGauge.SEGMENTS_IN_ERROR_STATE));
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
ControllerGauge.NUMBER_OF_REPLICAS), nReplicasExpectedValue);
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableName,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 423f1f21cb..8114579d29 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -185,6 +185,8 @@ public class IngestionDelayTracker {
// If we are removing a partition we should stop reading its ideal state.
_partitionsMarkedForVerification.remove(partitionGroupId);
_serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
+ _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
+ ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
}
/*
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 39e4855f63..6b558b93e7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -223,8 +223,7 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
return false;
}
if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
- TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null,
Long.MIN_VALUE, Long.MIN_VALUE,
- Long.MIN_VALUE, Long.MIN_VALUE)) {
+ TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, 0,
0, 0, 0)) {
return false;
}
String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index bf3f2f2f22..7fe52f472b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.server.starter.helix;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
@@ -32,8 +33,11 @@ import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableDeletionMessage;
+import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.slf4j.Logger;
@@ -177,6 +181,22 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
_metrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETE_TABLE_FAILURES, 1);
Utils.rethrowException(e);
}
+ try {
+ Arrays.stream(ServerMeter.values())
+ .filter(m -> !m.isGlobal())
+ .forEach(m -> _metrics.removeTableMeter(_tableNameWithType, m));
+ Arrays.stream(ServerGauge.values())
+ .filter(g -> !g.isGlobal())
+ .forEach(g -> _metrics.removeTableGauge(_tableNameWithType, g));
+ Arrays.stream(ServerTimer.values())
+ .filter(t -> !t.isGlobal())
+ .forEach(t -> _metrics.removeTableTimer(_tableNameWithType, t));
+ Arrays.stream(ServerQueryPhase.values())
+ .forEach(p -> _metrics.removePhaseTiming(_tableNameWithType, p));
+ } catch (Exception e) {
+ LOGGER.warn("Error while removing metrics of removed table {}. "
+ + "Some metrics may survive until the next restart.",
_tableNameWithType);
+ }
return helixTaskResult;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]