This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76379fb86a making nonLeaderForTables exhaustive (#12345)
76379fb86a is described below
commit 76379fb86a2810301a5e79113d75437127bb6ccf
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Feb 1 17:59:36 2024 +0100
making nonLeaderForTables exhaustive (#12345)
---
.../controller/helix/SegmentStatusChecker.java | 9 +---
.../core/periodictask/ControllerPeriodicTask.java | 59 ++++++++++++----------
2 files changed, 33 insertions(+), 35 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..f4121506a1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -77,8 +77,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
private TableSizeReader _tableSizeReader;
- private Set<String> _cachedTableNamesWithType = new HashSet<>();
-
/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact
with Helix
@@ -152,12 +150,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_DISABLED, 0);
}
});
-
- // Remove metrics for tables that are no longer in the cluster
- _cachedTableNamesWithType.removeAll(context._processedTables);
- _cachedTableNamesWithType.forEach(this::removeMetricsForTable);
- _cachedTableNamesWithType.clear();
- _cachedTableNamesWithType.addAll(context._processedTables);
}
/**
@@ -350,6 +342,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
}
private void removeMetricsForTable(String tableNameWithType) {
+ LOGGER.info("Removing metrics from {} given it is not a table known by
Helix", tableNameWithType);
_controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3067de8268..6761efde96 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.periodictask;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -46,6 +51,7 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
protected final PinotHelixResourceManager _pinotHelixResourceManager;
protected final LeadControllerManager _leadControllerManager;
protected final ControllerMetrics _controllerMetrics;
+ protected Set<String> _prevLeaderOfTables = new HashSet<>();
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
@@ -63,30 +69,23 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
// Check if we have a specific table against which this task needs to be
run.
String propTableNameWithType = (String)
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
// Process the tables that are managed by this controller
- List<String> tablesToProcess = new ArrayList<>();
- List<String> nonLeaderForTables = new ArrayList<>();
- if (propTableNameWithType == null) {
- // Table name is not available, so task should run on all tables for
which this controller is the lead.
- for (String tableNameWithType :
_pinotHelixResourceManager.getAllTables()) {
- if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
- tablesToProcess.add(tableNameWithType);
- } else {
- nonLeaderForTables.add(tableNameWithType);
- }
- }
- } else {
- // Table name is available, so task should run only on the specified
table.
- if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
- tablesToProcess.add(propTableNameWithType);
- }
- }
+ List<String> allTables = propTableNameWithType == null
+ ? _pinotHelixResourceManager.getAllTables()
+ : Collections.singletonList(propTableNameWithType);
+
+ Set<String> currentLeaderOfTables = allTables.stream()
+ .filter(_leadControllerManager::isLeaderForTable)
+ .collect(Collectors.toSet());
- if (!tablesToProcess.isEmpty()) {
- processTables(tablesToProcess, periodicTaskProperties);
+ if (!currentLeaderOfTables.isEmpty()) {
+ processTables(new ArrayList<>(currentLeaderOfTables),
periodicTaskProperties);
}
+
+ Set<String> nonLeaderForTables = Sets.difference(_prevLeaderOfTables,
currentLeaderOfTables);
if (!nonLeaderForTables.isEmpty()) {
- nonLeaderCleanup(nonLeaderForTables);
+ nonLeaderCleanup(new ArrayList<>(nonLeaderForTables));
}
+ _prevLeaderOfTables = currentLeaderOfTables;
} catch (Exception e) {
LOGGER.error("Caught exception while running task: {}", _taskName, e);
_controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -98,9 +97,12 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
}
/**
- * Processes the given list of tables, and returns the number of tables
processed.
+ * Processes the given list of tables lead by the current controller, and
returns the number of tables processed.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link
#processTable(String, C)}.
+ * <p/>
+ * Note: This method is called each time the task is executed <b>if and only
if</b> the current controller is the
+ * leader of at least one table. A corollary is that it won't be called
every time the task is executed.
*/
protected void processTables(List<String> tableNamesWithType, Properties
periodicTaskProperties) {
int numTables = tableNamesWithType.size();
@@ -128,14 +130,14 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
}
/**
- * Can be overridden to provide context before processing the tables.
+ * Can be overridden to provide context before processing the tables lead by
the current controller.
*/
protected C preprocess(Properties periodicTaskProperties) {
return null;
}
/**
- * Processes the given table.
+ * Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link
#processTables(List, Properties)}.
*/
@@ -144,7 +146,7 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
}
/**
- * Processes the given table.
+ * Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String, C)} or {@link
#processTables(List, Properties)}.
*/
@@ -152,20 +154,23 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
}
/**
- * Can be overridden to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables lead by
the current controller.
*/
protected void postprocess(C context) {
postprocess();
}
/**
- * Can be overridden to perform cleanups after processing the tables.
+ * Can be overridden to perform cleanups after processing the tables lead by
the current controller.
*/
protected void postprocess() {
}
/**
- * Can be overridden to perform cleanups for tables that the current
controller isn't the leader.
+ * Can be overridden to perform cleanups for tables the current controller
lost the leadership.
+ * <p/>
+ * Note: This method is only being called when there is at least one table
in the given list. A corollary is that it
+ * won't be called every time the task is executed.
*
* @param tableNamesWithType the table names that the current controller
isn't the leader for
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]