This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 98c39a5f71 Repurpose MinionInstancesCleanupTask to
StaleInstancesCleanupTask to remove stale broker and server instances as well.
(#10027)
98c39a5f71 is described below
commit 98c39a5f718b7d1f623cd4f856a720206773544d
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 4 21:47:42 2023 -0800
Repurpose MinionInstancesCleanupTask to StaleInstancesCleanupTask to remove
stale broker and server instances as well. (#10027)
---
.../pinot/common/metrics/ControllerGauge.java | 8 +-
.../pinot/controller/BaseControllerStarter.java | 14 +-
.../apache/pinot/controller/ControllerConf.java | 52 ++++++++
.../core/cleanup/StaleInstancesCleanupTask.java | 146 +++++++++++++++++++++
.../core/minion/MinionInstancesCleanupTask.java | 88 -------------
.../StaleInstancesCleanupTaskStatelessTest.java | 138 +++++++++++++++++++
.../MinionInstancesCleanupTaskStatelessTest.java | 82 ------------
7 files changed, 350 insertions(+), 178 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index b44bd8e999..64ced0203c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -101,9 +101,15 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// Number of Tasks Status
TASK_STATUS("taskStatus", false),
- // Number of dropped minion instances
+ // Number of dropped stale minion instances
DROPPED_MINION_INSTANCES("droppedMinionInstances", true),
+ // Number of dropped stale broker instances
+ DROPPED_BROKER_INSTANCES("droppedBrokerInstances", true),
+
+ // Number of dropped stale server instances
+ DROPPED_SERVER_INSTANCES("droppedServerInstances", true),
+
// Number of online minion instances
ONLINE_MINION_INSTANCES("onlineMinionInstances", true),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f6ebb5006d..83db19eccf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -76,7 +76,7 @@ import
org.apache.pinot.controller.api.resources.InvalidControllerConfigExceptio
import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import
org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
+import
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -164,7 +164,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected SegmentCompletionManager _segmentCompletionManager;
protected LeadControllerManager _leadControllerManager;
protected List<ServiceStatus.ServiceStatusCallback>
_serviceStatusCallbackList;
- protected MinionInstancesCleanupTask _minionInstancesCleanupTask;
+ protected StaleInstancesCleanupTask _staleInstancesCleanupTask;
protected TaskMetricsEmitter _taskMetricsEmitter;
protected MultiThreadedHttpConnectionManager _connectionManager;
@@ -295,8 +295,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
return _taskManager;
}
- public MinionInstancesCleanupTask getMinionInstancesCleanupTask() {
- return _minionInstancesCleanupTask;
+ public StaleInstancesCleanupTask getStaleInstancesCleanupTask() {
+ return _staleInstancesCleanupTask;
}
@Override
@@ -687,9 +687,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_segmentRelocator = new SegmentRelocator(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
_executorService, _connectionManager);
periodicTasks.add(_segmentRelocator);
- _minionInstancesCleanupTask =
- new MinionInstancesCleanupTask(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics);
- periodicTasks.add(_minionInstancesCleanupTask);
+ _staleInstancesCleanupTask =
+ new StaleInstancesCleanupTask(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics);
+ periodicTasks.add(_staleInstancesCleanupTask);
_taskMetricsEmitter =
new TaskMetricsEmitter(_helixResourceManager,
_helixTaskResourceManager, _leadControllerManager, _config,
_controllerMetrics);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e24530ab4c..015b1e90de 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -129,16 +129,27 @@ public class ControllerConf extends PinotConfiguration {
@Deprecated
public static final String
DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS =
"controller.minion.instances.cleanup.task.frequencyInSeconds";
+ @Deprecated
public static final String MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
"controller.minion.instances.cleanup.task.frequencyPeriod";
+ @Deprecated
public static final String
MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
"controller.minion.instances.cleanup.task.initialDelaySeconds";
// Deprecated as of 0.8.0
@Deprecated
public static final String
DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS
=
"controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionSeconds";
+ @Deprecated
public static final String
MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD =
"controller.minion.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod";
+
+ public static final String STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
+ "controller.stale.instances.cleanup.task.frequencyPeriod";
+ public static final String
STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
+ "controller.stale.instances.cleanup.task.initialDelaySeconds";
+ public static final String
STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD =
+
"controller.stale.instances.cleanup.task.minOfflineTimeBeforeDeletionPeriod";
+
// Deprecated as of 0.8.0
@Deprecated
public static final String
DEPRECATED_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS =
@@ -209,7 +220,9 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS
= 5 * 60; // 5 minutes
private static final int
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1;
// Disabled
+ @Deprecated
private static final int
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ @Deprecated
private static final int
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS
=
60 * 60; // 1 Hour.
@@ -699,6 +712,7 @@ public class ControllerConf extends PinotConfiguration {
Integer.toString(frequencyInSeconds));
}
+ @Deprecated
public int getMinionInstancesCleanupTaskFrequencyInSeconds() {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
@@ -706,21 +720,25 @@ public class ControllerConf extends PinotConfiguration {
ControllerPeriodicTasksConf.DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS));
}
+ @Deprecated
public void setMinionInstancesCleanupTaskFrequencyInSeconds(int
frequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS,
Integer.toString(frequencyInSeconds));
}
+ @Deprecated
public long getMinionInstancesCleanupTaskInitialDelaySeconds() {
return
getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
}
+ @Deprecated
public void setMinionInstancesCleanupTaskInitialDelaySeconds(int
initialDelaySeconds) {
setProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
Integer.toString(initialDelaySeconds));
}
+ @Deprecated
public int
getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds() {
return Optional.ofNullable(
getProperty(ControllerPeriodicTasksConf.MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD))
@@ -731,12 +749,46 @@ public class ControllerConf extends PinotConfiguration {
DEFAULT_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_IN_SECONDS));
}
+ @Deprecated
public void
setMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds(int
maxOfflineTimeRangeInSeconds) {
setProperty(
ControllerPeriodicTasksConf.DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_SECONDS,
Integer.toString(maxOfflineTimeRangeInSeconds));
}
+ public int getStaleInstancesCleanupTaskFrequencyInSeconds() {
+ return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD))
+ .map(period -> (int) convertPeriodToSeconds(period))
+ // Backward compatible for existing users who configured
MinionInstancesCleanupTask
+ .orElse(getMinionInstancesCleanupTaskFrequencyInSeconds());
+ }
+
+ public void setStaleInstanceCleanupTaskFrequencyInSeconds(String
frequencyPeriod) {
+
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
frequencyPeriod);
+ }
+
+ public long getStaleInstanceCleanupTaskInitialDelaySeconds() {
+ return
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
+ // Backward compatible for existing users who configured
MinionInstancesCleanupTask
+ getMinionInstancesCleanupTaskInitialDelaySeconds());
+ }
+
+ public void setStaleInstanceCleanupTaskInitialDelaySeconds(long
initialDelaySeconds) {
+
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
initialDelaySeconds);
+ }
+
+ public int getStaleInstancesCleanupTaskInstancesRetentionInSeconds() {
+ return Optional.ofNullable(
+
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD))
+ .map(period -> (int) convertPeriodToSeconds(period))
+ // Backward compatible for existing users who configured
MinionInstancesCleanupTask
+
.orElse(getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds());
+ }
+
+ public void setStaleInstancesCleanupTaskInstancesRetentionPeriod(String
retentionPeriod) {
+
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD,
retentionPeriod);
+ }
+
public int getDefaultTableMinReplicas() {
return getProperty(TABLE_MIN_REPLICAS, DEFAULT_TABLE_MIN_REPLICAS);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
new file mode 100644
index 0000000000..b257462985
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Automatically removes stale instances from the cluster to not spam Helix.
+ * Stale instance is the instance not in use (not hosting any data or query)
and has been in the offline status for more
+ * than the stale instance retention time.
+ */
+public class StaleInstancesCleanupTask extends BasePeriodicTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StaleInstancesCleanupTask.class);
+ private final static String TASK_NAME = "StaleInstancesCleanupTask";
+
+ protected final PinotHelixResourceManager _pinotHelixResourceManager;
+ protected final LeadControllerManager _leadControllerManager;
+ protected final ControllerMetrics _controllerMetrics;
+ // This applies to both broker and server instances.
+ private final long
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds;
+
+ public StaleInstancesCleanupTask(PinotHelixResourceManager
pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf
controllerConf, ControllerMetrics controllerMetrics) {
+ super(TASK_NAME,
controllerConf.getStaleInstancesCleanupTaskFrequencyInSeconds(),
+ controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds());
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _leadControllerManager = leadControllerManager;
+ _controllerMetrics = controllerMetrics;
+ _staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds =
+
controllerConf.getStaleInstancesCleanupTaskInstancesRetentionInSeconds() *
1000L;
+ }
+
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ // Make it so that only one controller is responsible for cleaning up
minion instances.
+ if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
+ return;
+ }
+
+ List<String> offlineInstances = new
ArrayList<>(_pinotHelixResourceManager.getAllInstances());
+
offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+
+ Set<String> serverInstancesInUse = getServerInstancesInUse();
+ Set<String> brokerInstancesInUse = getBrokerInstancesInUse();
+
+ for (String offlineInstance : offlineInstances) {
+ // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK
session expire (e.g. due to network issue),
+ // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race
condition can happen when this task is
+ // running.
+ // In order to double confirm the live status of an instance, the field
"LAST_OFFLINE_TIME" in ZNode under
+ // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value
is "-1", that means the instance is
+ // ONLINE;
+ // if the value is a timestamp, that means the instance starts to be
OFFLINE since that time.
+ if (InstanceTypeUtils.isMinion(offlineInstance)) {
+ // Drop the minion instance if it has been offline for more than a
period of this task.
+ if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+ LOGGER.info("Dropping minion instance: {}", offlineInstance);
+ PinotResourceManagerResponse response =
_pinotHelixResourceManager.dropInstance(offlineInstance);
+ if (response.isSuccessful()) {
+
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
1);
+ }
+ }
+ continue;
+ }
+
+ // Drop the broker instance if it has been offline for more than a
period of this task.
+ if (InstanceTypeUtils.isBroker(offlineInstance) &&
!brokerInstancesInUse.contains(offlineInstance)) {
+ if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+ LOGGER.info("Dropping broker instance: {}", offlineInstance);
+ PinotResourceManagerResponse response =
_pinotHelixResourceManager.dropInstance(offlineInstance);
+ if (response.isSuccessful()) {
+
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES,
1);
+ }
+ }
+ continue;
+ }
+
+ // Drop the server instance if it has been offline for more than a
period of this task.
+ if (InstanceTypeUtils.isServer(offlineInstance) &&
!serverInstancesInUse.contains(offlineInstance)) {
+ if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
+
_staleInstancesCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
+ LOGGER.info("Dropping server instance: {}", offlineInstance);
+ PinotResourceManagerResponse response =
_pinotHelixResourceManager.dropInstance(offlineInstance);
+ if (response.isSuccessful()) {
+
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES,
1);
+ }
+ }
+ }
+ }
+ }
+
+ private Set<String> getBrokerInstancesInUse() {
+ Set<String> brokerInstancesInUse = new HashSet<>();
+ final IdealState brokerResource =
_pinotHelixResourceManager.getHelixAdmin()
+
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ brokerResource.getPartitionSet()
+ .forEach(table ->
brokerInstancesInUse.addAll(brokerResource.getInstanceSet(table)));
+ return brokerInstancesInUse;
+ }
+
+ private Set<String> getServerInstancesInUse() {
+ Set<String> serverInstancesInUse = new HashSet<>();
+ _pinotHelixResourceManager.getAllTables().forEach(tableName ->
serverInstancesInUse.addAll(
+
Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName))
+ .map(is ->
is.getInstanceSet(tableName)).orElse(Collections.emptySet())));
+ return serverInstancesInUse;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
deleted file mode 100644
index 09392c6b00..0000000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.minion;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.LeadControllerManager;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.core.periodictask.BasePeriodicTask;
-import org.apache.pinot.spi.utils.InstanceTypeUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A periodic task to clean up offline Minion instances to not spam Helix.
- */
-public class MinionInstancesCleanupTask extends BasePeriodicTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MinionInstancesCleanupTask.class);
- private final static String TASK_NAME = "MinionInstancesCleanupTask";
- protected final PinotHelixResourceManager _pinotHelixResourceManager;
- protected final LeadControllerManager _leadControllerManager;
- protected final ControllerMetrics _controllerMetrics;
- private final long
_minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds;
-
- public MinionInstancesCleanupTask(PinotHelixResourceManager
pinotHelixResourceManager,
- LeadControllerManager leadControllerManager, ControllerConf
controllerConf, ControllerMetrics controllerMetrics) {
- super(TASK_NAME,
controllerConf.getMinionInstancesCleanupTaskFrequencyInSeconds(),
- controllerConf.getMinionInstancesCleanupTaskInitialDelaySeconds());
- _pinotHelixResourceManager = pinotHelixResourceManager;
- _leadControllerManager = leadControllerManager;
- _controllerMetrics = controllerMetrics;
- _minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds =
-
controllerConf.getMinionInstancesCleanupTaskMinOfflineTimeBeforeDeletionInSeconds()
* 1000L;
- }
-
- @Override
- protected void runTask(Properties periodicTaskProperties) {
- // Make it so that only one controller is responsible for cleaning up
minion instances.
- if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
- return;
- }
-
- List<String> offlineInstances = new
ArrayList<>(_pinotHelixResourceManager.getAllInstances());
-
offlineInstances.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
- for (String offlineInstance : offlineInstances) {
- // Since ZNodes under "/LIVEINSTANCES" are ephemeral, if there is a ZK
session expire (e.g. due to network issue),
- // the ZNode under "/LIVEINSTANCES" will be deleted. Thus, such race
condition can happen when this task is
- // running.
- // In order to double confirm the live status of an instance, the field
"LAST_OFFLINE_TIME" in ZNode under
- // "/INSTANCES/<instance_id>/HISTORY" needs to be checked. If the value
is "-1", that means the instance is
- // ONLINE;
- // if the value is a timestamp, that means the instance starts to be
OFFLINE since that time.
- if (InstanceTypeUtils.isMinion(offlineInstance)) {
- // Drop the minion instance if it has been offline for more than a
period of this task.
- if (_pinotHelixResourceManager.isInstanceOfflineFor(offlineInstance,
-
_minionInstanceCleanupTaskMinOfflineTimeBeforeDeletionInMilliseconds)) {
- LOGGER.info("Dropping minion instance: {}", offlineInstance);
- PinotResourceManagerResponse response =
_pinotHelixResourceManager.dropInstance(offlineInstance);
- if (response.isSuccessful()) {
-
_controllerMetrics.addValueToGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES,
1);
- }
- }
- }
- }
- }
-}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
new file mode 100644
index 0000000000..4bc0b4cd10
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "stateless")
+public class StaleInstancesCleanupTaskStatelessTest extends ControllerTest {
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ startZk();
+ startController();
+ }
+
+ @Test
+ public void testStaleInstancesCleanupTaskForBrokers()
+ throws Exception {
+ StaleInstancesCleanupTask staleInstancesCleanupTask =
_controllerStarter.getStaleInstancesCleanupTask();
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
0);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(3, true);
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
0);
+ stopFakeInstance("Broker_localhost_0");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
1);
+ stopFakeInstance("Broker_localhost_1");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
2);
+ stopFakeInstance("Broker_localhost_2");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES),
3);
+ }
+
+ @Test
+ public void testStaleInstancesCleanupTaskForServers()
+ throws Exception {
+ StaleInstancesCleanupTask staleInstancesCleanupTask =
_controllerStarter.getStaleInstancesCleanupTask();
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
0);
+ addFakeServerInstancesToAutoJoinHelixCluster(3, true);
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
0);
+ stopFakeInstance("Server_localhost_0");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
1);
+ stopFakeInstance("Server_localhost_1");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
2);
+ stopFakeInstance("Server_localhost_2");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES),
3);
+ }
+
+ @Test
+ public void testStaleInstancesCleanupTaskForMinions()
+ throws Exception {
+ StaleInstancesCleanupTask staleInstancesCleanupTask =
_controllerStarter.getStaleInstancesCleanupTask();
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
+ addFakeMinionInstancesToAutoJoinHelixCluster(3);
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
+ stopFakeInstance("Minion_localhost_0");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
1);
+ stopFakeInstance("Minion_localhost_1");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
2);
+ stopFakeInstance("Minion_localhost_2");
+ Thread.sleep(1000);
+ staleInstancesCleanupTask.runTask(new Properties());
+ Assert.assertEquals(
+
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
3);
+ }
+
+ @Override
+ public Map<String, Object> getDefaultControllerConfiguration() {
+ Map<String, Object> properties = super.getDefaultControllerConfiguration();
+ // Override the cleanup before deletion period so that test can avoid
stuck failure
+ properties.put(ControllerConf.ControllerPeriodicTasksConf.
+ MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD,
"1s");
+ properties.put(ControllerConf.ControllerPeriodicTasksConf.
+ STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD, "1s");
+ return properties;
+ }
+
+ @AfterClass
+ public void teardown() {
+ stopController();
+ stopZk();
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
deleted file mode 100644
index 950f7c5492..0000000000
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskStatelessTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.minion;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-@Test(groups = "stateless")
-public class MinionInstancesCleanupTaskStatelessTest extends ControllerTest {
- @BeforeClass
- public void setup()
- throws Exception {
- startZk();
- startController();
- }
-
- @Test
- public void testMinionInstancesCleanupTask()
- throws Exception {
- MinionInstancesCleanupTask minionInstancesCleanupTask =
_controllerStarter.getMinionInstancesCleanupTask();
- minionInstancesCleanupTask.runTask(new Properties());
- Assert.assertEquals(
-
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
- addFakeMinionInstancesToAutoJoinHelixCluster(3);
- Assert.assertEquals(
-
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
- stopFakeInstance("Minion_localhost_0");
- Thread.sleep(1000);
- minionInstancesCleanupTask.runTask(new Properties());
- Assert.assertEquals(
-
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
1);
- stopFakeInstance("Minion_localhost_1");
- Thread.sleep(1000);
- minionInstancesCleanupTask.runTask(new Properties());
- Assert.assertEquals(
-
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
2);
- stopFakeInstance("Minion_localhost_2");
- Thread.sleep(1000);
- minionInstancesCleanupTask.runTask(new Properties());
- Assert.assertEquals(
-
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
3);
- }
-
- @Override
- public Map<String, Object> getDefaultControllerConfiguration() {
- Map<String, Object> properties = super.getDefaultControllerConfiguration();
- // Override the cleanup before deletion period so that test can avoid
stuck failure
- properties.put(ControllerConf.ControllerPeriodicTasksConf.
- MINION_INSTANCES_CLEANUP_TASK_MIN_OFFLINE_TIME_BEFORE_DELETION_PERIOD,
"1s");
- return properties;
- }
-
- @AfterClass
- public void teardown() {
- stopController();
- stopZk();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]