This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1beedd5 Enhance controller periodic task and scheduler (#3475)
1beedd5 is described below
commit 1beedd53315882e7ad58c439bf190dc5b671652b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Nov 15 16:42:41 2018 -0800
Enhance controller periodic task and scheduler (#3475)
For ControllerPeriodicTask:
1. Random pick an initial delay of range 2-5 mins so each task does not
start at the same time
2. Make leadership information private to the class
For PeriodicTaskScheduler:
1. Filter out tasks with non-positive interval, so we can easily disable
any task by setting the interval to an non-positive value
2. Set the size of thread pool to be the same as number of tasks
3. Reduce the run-time of PeriodicTaskSchedulerTest
---
.../pinot/controller/ControllerStarter.java | 37 ++---
.../controller/helix/SegmentStatusChecker.java | 17 +-
.../helix/core/minion/PinotTaskManager.java | 16 +-
.../core/periodictask/ControllerPeriodicTask.java | 64 ++++----
.../core/relocation/RealtimeSegmentRelocator.java | 13 +-
.../helix/core/retention/RetentionManager.java | 16 +-
.../controller/validation/ValidationManager.java | 16 +-
.../controller/helix/SegmentStatusCheckerTest.java | 68 +-------
.../periodictask/ControllerPeriodicTaskTest.java | 128 ++++++++-------
.../pinot/core/periodictask/BasePeriodicTask.java | 12 +-
.../core/periodictask/PeriodicTaskScheduler.java | 62 ++++---
.../periodictask/PeriodicTaskSchedulerTest.java | 181 +++++----------------
12 files changed, 238 insertions(+), 392 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 69a4a4f..bf25cef 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -38,9 +38,9 @@ import
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrate
import
com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.core.periodictask.PeriodicTask;
import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.filesystem.PinotFSFactory;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
@@ -164,40 +164,23 @@ public class ControllerStarter {
LOGGER.info("Starting task resource manager");
_helixTaskResourceManager = new PinotHelixTaskResourceManager(new
TaskDriver(helixManager));
- List<PeriodicTask> periodicTasks = new ArrayList<>();
+ // Helix resource manager must be started in order to create
PinotLLCRealtimeSegmentManager
+ LOGGER.info("Starting realtime segment manager");
+ PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config,
_controllerMetrics);
+ PinotLLCRealtimeSegmentManager.getInstance().start();
+ _realtimeSegmentsManager.start(_controllerMetrics);
+ // Setting up periodic tasks
+ List<PeriodicTask> periodicTasks = new ArrayList<>();
_taskManager = new PinotTaskManager(_helixTaskResourceManager,
_helixResourceManager, _config, _controllerMetrics);
- if (_taskManager.getIntervalInSeconds() > 0) {
- LOGGER.info("Adding task manager to periodic task scheduler");
- periodicTasks.add(_taskManager);
- }
-
- LOGGER.info("Adding retention manager to periodic task scheduler");
+ periodicTasks.add(_taskManager);
periodicTasks.add(_retentionManager);
-
- LOGGER.info("Adding validation manager to periodic task scheduler");
- // Helix resource manager must be started in order to create
PinotLLCRealtimeSegmentManager
- PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config,
_controllerMetrics);
_validationManager =
new ValidationManager(_config, _helixResourceManager,
PinotLLCRealtimeSegmentManager.getInstance(),
new ValidationMetrics(_metricsRegistry));
periodicTasks.add(_validationManager);
-
- LOGGER.info("Starting realtime segment manager");
- _realtimeSegmentsManager.start(_controllerMetrics);
- PinotLLCRealtimeSegmentManager.getInstance().start();
-
- if (_segmentStatusChecker.getIntervalInSeconds() == -1L) {
- LOGGER.warn("Segment status check interval is -1, status checks
disabled.");
- } else {
- LOGGER.info("Adding segment status checker to periodic task scheduler");
- periodicTasks.add(_segmentStatusChecker);
- }
-
- LOGGER.info("Adding realtime segment relocation manager to periodic task
scheduler");
+ periodicTasks.add(_segmentStatusChecker);
periodicTasks.add(_realtimeSegmentRelocator);
-
- LOGGER.info("Starting periodic task scheduler");
_periodicTaskScheduler.start(periodicTasks);
LOGGER.info("Creating rebalance segments factory");
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 4e51bd6..549dc9d 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -27,8 +27,6 @@ import
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicT
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
@@ -69,7 +67,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
_config = config;
_waitForPushTimeSeconds =
config.getStatusCheckerWaitForPushTimeInSeconds();
_metricsRegistry = metricsRegistry;
- HttpConnectionManager httpConnectionManager = new
MultiThreadedHttpConnectionManager();
}
@Override
@@ -85,15 +82,17 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> allTableNames) {
- updateSegmentMetrics(allTableNames);
+ public void process(List<String> tables) {
+ updateSegmentMetrics(tables);
}
/**
- * Runs a segment status pass over the currently loaded tables.
- * @param allTableNames List of all the table names
+ * Runs a segment status pass over the given tables.
+ * TODO: revisit the logic and reduce the ZK access
+ *
+ * @param tables List of table names
*/
- private void updateSegmentMetrics(List<String> allTableNames) {
+ private void updateSegmentMetrics(List<String> tables) {
// Fetch the list of tables
String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
@@ -112,7 +111,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
logDisabledTables = false;
}
- for (String tableName : allTableNames) {
+ for (String tableName : tables) {
try {
if (TableNameBuilder.getTableTypeFromTableName(tableName) ==
TableType.OFFLINE) {
offlineTableCount++;
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 460121e..0ff695d 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -52,8 +52,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager
helixTaskResourceManager,
@Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull
ControllerConf controllerConf,
@Nonnull ControllerMetrics controllerMetrics) {
- super("PinotTaskManager",
controllerConf.getTaskManagerFrequencyInSeconds(),
- Math.min(60, controllerConf.getTaskManagerFrequencyInSeconds()),
helixResourceManager);
+ super("PinotTaskManager",
controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager);
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoProvider = new ClusterInfoProvider(helixResourceManager,
helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
@@ -82,12 +81,13 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
}
/**
- * Check the Pinot cluster status and schedule new tasks.
- * @param allTableNames List of all the table names
+ * Check the Pinot cluster status and schedule new tasks for the given
tables.
+ *
+ * @param tables List of table names
* @return Map from task type to task scheduled
*/
@Nonnull
- private Map<String, String> scheduleTasks(List<String> allTableNames) {
+ private Map<String, String> scheduleTasks(List<String> tables) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
@@ -102,7 +102,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
}
// Scan all table configs to get the tables with tasks enabled
- for (String tableName : allTableNames) {
+ for (String tableName : tables) {
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableName);
if (tableConfig != null) {
TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -155,7 +155,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> allTableNames) {
- scheduleTasks(allTableNames);
+ public void process(List<String> tables) {
+ scheduleTasks(tables);
}
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index f00e715..3d1ef6a 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,10 +15,10 @@
*/
package com.linkedin.pinot.controller.helix.core.periodictask;
-import com.google.common.annotations.VisibleForTesting;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
import java.util.List;
+import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,21 +28,29 @@ import org.slf4j.LoggerFactory;
* which table resources should be managed by this Pinot controller.
*/
public abstract class ControllerPeriodicTask extends BasePeriodicTask {
- public static final Logger LOGGER =
LoggerFactory.getLogger(ControllerPeriodicTask.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerPeriodicTask.class);
+ private static final Random RANDOM = new Random();
+
+ public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
+ public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+
protected final PinotHelixResourceManager _pinotHelixResourceManager;
- private boolean _amILeader;
- private static final int DEFAULT_INITIAL_DELAY_IN_SECOND = 120;
- public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager) {
- this(taskName, runFrequencyInSeconds, DEFAULT_INITIAL_DELAY_IN_SECOND,
pinotHelixResourceManager);
- }
+ private boolean _isLeader = false;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
_pinotHelixResourceManager = pinotHelixResourceManager;
- setAmILeader(false);
+ }
+
+ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+ PinotHelixResourceManager pinotHelixResourceManager) {
+ this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(),
pinotHelixResourceManager);
+ }
+
+ private static long getRandomInitialDelayInSeconds() {
+ return MIN_INITIAL_DELAY_IN_SECONDS +
RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
}
@Override
@@ -60,38 +68,29 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
}
private void skipLeaderTask() {
- if (getAmILeader()) {
+ if (_isLeader) {
LOGGER.info("Current pinot controller lost leadership.");
+ _isLeader = false;
onBecomeNotLeader();
}
- setAmILeader(false);
- LOGGER.info("Skip running periodic task: {} on non-leader controller",
getTaskName());
+ LOGGER.info("Skip running periodic task: {} on non-leader controller",
_taskName);
}
- private void processLeaderTask(List<String> allTableNames) {
- if (!getAmILeader()) {
+ private void processLeaderTask(List<String> tables) {
+ if (!_isLeader) {
LOGGER.info("Current pinot controller became leader. Starting {} with
running frequency of {} seconds.",
- getTaskName(), getIntervalInSeconds());
+ _taskName, _intervalInSeconds);
+ _isLeader = true;
onBecomeLeader();
}
- setAmILeader(true);
long startTime = System.currentTimeMillis();
- LOGGER.info("Starting to process {} tables in periodic task: {}",
allTableNames.size(), getTaskName());
- process(allTableNames);
- LOGGER.info("Finished processing {} tables in periodic task: {} in {}ms",
allTableNames.size(), getTaskName(),
+ int numTables = tables.size();
+ LOGGER.info("Start processing {} tables in periodic task: {}", numTables,
_taskName);
+ process(tables);
+ LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms",
numTables, _taskName,
(System.currentTimeMillis() - startTime));
}
- @VisibleForTesting
- public boolean getAmILeader() {
- return _amILeader;
- }
-
- @VisibleForTesting
- public void setAmILeader(boolean amILeader) {
- _amILeader = amILeader;
- }
-
/**
* Does the following logic when losing the leadership. This should be done
only once during leadership transition.
*/
@@ -105,8 +104,9 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
}
/**
- * Processes the periodic task as lead controller.
- * @param allTableNames List of all the table names
+ * Processes the task on the given tables.
+ *
+ * @param tables List of table names
*/
- public abstract void process(List<String> allTableNames);
+ public abstract void process(List<String> tables);
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 0954f5b..d152887 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,18 +58,19 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> allTableNames) {
- runRelocation(allTableNames);
+ public void process(List<String> tables) {
+ runRelocation(tables);
}
/**
- * Check all tables. Perform relocation of segments if table is realtime and
relocation is required
+ * Check the given tables. Perform relocation of segments if table is
realtime and relocation is required
* TODO: Model this to implement {@link
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy}
interface
* https://github.com/linkedin/pinot/issues/2609
- * @param allTableNames List of all the table names
+ *
+ * @param tables List of table names
*/
- private void runRelocation(List<String> allTableNames) {
- for (final String tableNameWithType : allTableNames) {
+ private void runRelocation(List<String> tables) {
+ for (String tableNameWithType : tables) {
// Only consider realtime tables.
if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
continue;
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 7423cef..d76cc3b 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -51,8 +51,7 @@ public class RetentionManager extends ControllerPeriodicTask {
public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
int runFrequencyInSeconds,
int deletedSegmentsRetentionInDays) {
- super("RetentionManager", runFrequencyInSeconds, Math.min(60,
runFrequencyInSeconds),
- pinotHelixResourceManager);
+ super("RetentionManager", runFrequencyInSeconds,
pinotHelixResourceManager);
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
}
@@ -63,17 +62,18 @@ public class RetentionManager extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> allTableNames) {
- execute(allTableNames);
+ public void process(List<String> tables) {
+ execute(tables);
}
/**
- * Manages retention for all tables.
- * @param allTableNames List of all the table names
+ * Manages retention for the given tables.
+ *
+ * @param tables List of table names
*/
- private void execute(List<String> allTableNames) {
+ private void execute(List<String> tables) {
try {
- for (String tableNameWithType : allTableNames) {
+ for (String tableNameWithType : tables) {
LOGGER.info("Start managing retention for table: {}",
tableNameWithType);
manageRetentionForTable(tableNameWithType);
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index ce2b404..8717318 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -60,8 +60,7 @@ public class ValidationManager extends ControllerPeriodicTask
{
public ValidationManager(ControllerConf config, PinotHelixResourceManager
pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics) {
- super("ValidationManager",
config.getValidationControllerFrequencyInSeconds(),
- config.getValidationControllerFrequencyInSeconds() / 2,
pinotHelixResourceManager);
+ super("ValidationManager",
config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
_segmentLevelValidationIntervalInSeconds =
config.getSegmentLevelValidationIntervalInSeconds();
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
@@ -75,15 +74,16 @@ public class ValidationManager extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> allTableNames) {
- runValidation(allTableNames);
+ public void process(List<String> tables) {
+ runValidation(tables);
}
/**
- * Runs a validation pass over the currently loaded tables.
- * @param allTableNames List of all the table names
+ * Runs a validation pass over the given tables.
+ *
+ * @param tables List of table names
*/
- private void runValidation(List<String> allTableNames) {
+ private void runValidation(List<String> tables) {
// Run segment level validation using a separate interval
boolean runSegmentLevelValidation = false;
long currentTimeMs = System.currentTimeMillis();
@@ -97,7 +97,7 @@ public class ValidationManager extends ControllerPeriodicTask
{
// Cache instance configs to reduce ZK access
List<InstanceConfig> instanceConfigs =
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
- for (String tableNameWithType : allTableNames) {
+ for (String tableNameWithType : tables) {
try {
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 16248db..de6eca9 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -33,13 +33,9 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class SegmentStatusCheckerTest {
@@ -49,18 +45,6 @@ public class SegmentStatusCheckerTest {
private ControllerMetrics controllerMetrics;
private ControllerConf config;
- @BeforeSuite
- public void setUp() throws Exception {
- }
-
- @AfterSuite
- public void tearDown() {
- }
-
- @BeforeMethod
- public void beforeMethod() {
- }
-
@Test
public void offlineBasicTest() throws Exception {
final String tableName = "myTable_OFFLINE";
@@ -104,7 +88,6 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -115,7 +98,6 @@ public class SegmentStatusCheckerTest {
ControllerGauge.PERCENT_OF_REPLICAS), 33);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
}
@Test
@@ -172,7 +154,6 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -183,53 +164,6 @@ public class SegmentStatusCheckerTest {
ControllerGauge.PERCENT_OF_REPLICAS), 100);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
- }
-
- @Test
- public void nonLeaderTest() throws Exception {
- final String tableName = "myTable_REALTIME";
- List<String> allTableNames = new ArrayList<String>();
- allTableNames.add(tableName);
-
- HelixAdmin helixAdmin;
- {
- helixAdmin = mock(HelixAdmin.class);
- }
- {
- helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(false);
- when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
-
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
- when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
- }
- {
- config = mock(ControllerConf.class);
- when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- metricsRegistry = new MetricsRegistry();
- controllerMetrics = new ControllerMetrics(metricsRegistry);
-
- // From non-leader to non-leader.
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
- segmentStatusChecker.run();
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
- Long.MIN_VALUE);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS),
- Long.MIN_VALUE);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
-
- // Leadership transition from leader to non-leader.
- controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0L);
- segmentStatusChecker.setAmILeader(true);
- segmentStatusChecker.run();
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
- Long.MIN_VALUE);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS),
- Long.MIN_VALUE);
- Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
}
@Test
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 354456f..6a2c1b1 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -16,75 +16,91 @@
package com.linkedin.pinot.controller.helix.core.periodictask;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
-import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.Assert;
-import org.testng.annotations.BeforeTest;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
public class ControllerPeriodicTaskTest {
- private PinotHelixResourceManager helixResourceManager;
- private AtomicInteger numOfProcessingMessages;
-
- @BeforeTest
- public void setUp() {
- numOfProcessingMessages = new AtomicInteger(0);
- helixResourceManager = mock(PinotHelixResourceManager.class);
- List<String> allTableNames = new ArrayList<>();
- allTableNames.add("testTable_REALTIME");
- allTableNames.add("testTable_OFFLINE");
- when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
+ private static final long RUN_FREQUENCY_IN_SECONDS = 30;
+
+ private final PinotHelixResourceManager _resourceManager =
mock(PinotHelixResourceManager.class);
+ private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
+ private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+ private final AtomicBoolean _processCalled = new AtomicBoolean();
+
+ private final ControllerPeriodicTask _task =
+ new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
_resourceManager) {
+ @Override
+ public void onBecomeLeader() {
+ _onBecomeLeaderCalled.set(true);
+ }
+
+ @Override
+ public void onBecomeNotLeader() {
+ _onBecomeNonLeaderCalled.set(true);
+ }
+
+ @Override
+ public void process(List<String> tables) {
+ _processCalled.set(true);
+ }
+ };
+
+ private void resetState() {
+ _onBecomeLeaderCalled.set(false);
+ _onBecomeNonLeaderCalled.set(false);
+ _processCalled.set(false);
}
@Test
- public void testWhenControllerIsLeader() throws InterruptedException {
- long totalRunTimeInMilliseconds = 3_500L;
- long runFrequencyInSeconds = 1L;
- long initialDelayInSeconds = 1L;
- when(helixResourceManager.isLeader()).thenReturn(true);
-
- PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds,
initialDelayInSeconds);
-
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- periodicTaskScheduler.start(Collections.singletonList(periodicTask));
- Thread.sleep(totalRunTimeInMilliseconds);
- periodicTaskScheduler.stop();
- Assert.assertEquals(totalRunTimeInMilliseconds / 1000L,
numOfProcessingMessages.get());
+ public void testRandomInitialDelay() {
+ assertTrue(_task.getInitialDelayInSeconds() >=
ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS);
+ assertTrue(_task.getInitialDelayInSeconds() <
ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS);
+
+ assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
}
@Test
- public void testWhenControllerIsNotLeader() throws InterruptedException {
- long totalRunTimeInMilliseconds = 3_500L;
- long runFrequencyInSeconds = 1L;
- long initialDelayInSeconds = 1L;
-
- when(helixResourceManager.isLeader()).thenReturn(false);
- PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds,
initialDelayInSeconds);
-
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- periodicTaskScheduler.start(Collections.singletonList(periodicTask));
- Thread.sleep(totalRunTimeInMilliseconds);
- periodicTaskScheduler.stop();
- Assert.assertEquals(0, numOfProcessingMessages.get());
- }
+ public void testChangeLeadership() {
+ // Initial state
+ resetState();
+ _task.init();
+ assertFalse(_onBecomeLeaderCalled.get());
+ assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_processCalled.get());
+
+ // From non-leader to non-leader
+ resetState();
+ _task.run();
+ assertFalse(_onBecomeLeaderCalled.get());
+ assertFalse(_onBecomeNonLeaderCalled.get());
+ assertFalse(_processCalled.get());
+
+ // From non-leader to leader
+ resetState();
+ when(_resourceManager.isLeader()).thenReturn(true);
+ _task.run();
+ assertTrue(_onBecomeLeaderCalled.get());
+ assertFalse(_onBecomeNonLeaderCalled.get());
+ assertTrue(_processCalled.get());
+
+ // From leader to leader
+ resetState();
+ _task.run();
+ assertFalse(_onBecomeLeaderCalled.get());
+ assertFalse(_onBecomeNonLeaderCalled.get());
+ assertTrue(_processCalled.get());
- private PeriodicTask createMockPeriodicTask(long runFrequencyInSeconds, long
initialDelayInSeconds) {
- return new ControllerPeriodicTask("Task", runFrequencyInSeconds,
initialDelayInSeconds, helixResourceManager) {
- public void init() {
- numOfProcessingMessages.set(0);
- }
-
- @Override
- public void process(List<String> allTableNames) {
- numOfProcessingMessages.incrementAndGet();
- }
- };
+ // From leader to non-leader
+ resetState();
+ when(_resourceManager.isLeader()).thenReturn(false);
+ _task.run();
+ assertFalse(_onBecomeLeaderCalled.get());
+ assertTrue(_onBecomeNonLeaderCalled.get());
+ assertFalse(_processCalled.get());
}
}
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
index a1f4207..15ee475 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
@@ -19,9 +19,9 @@ package com.linkedin.pinot.core.periodictask;
* A base class to implement periodic task interface.
*/
public abstract class BasePeriodicTask implements PeriodicTask {
- private final String _taskName;
- private long _intervalInSeconds;
- private long _initialDelayInSeconds;
+ protected final String _taskName;
+ protected final long _intervalInSeconds;
+ protected final long _initialDelayInSeconds;
public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long
initialDelayInSeconds) {
_taskName = taskName;
@@ -43,4 +43,10 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
public String getTaskName() {
return _taskName;
}
+
+ @Override
+ public String toString() {
+ return String.format("Task: %s, Interval: %ds, Initial Delay: %ds",
_taskName, _intervalInSeconds,
+ _initialDelayInSeconds);
+ }
}
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index f58fffc..cd07ea4 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.core.periodictask;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -22,51 +23,58 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Periodic task scheduler will schedule a list of tasks based on their
initial delay time and interval time.
*/
public class PeriodicTaskScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
- private static final int CORE_POOL_SIZE = 5;
- private final ScheduledExecutorService _executorService;
- public PeriodicTaskScheduler() {
- LOGGER.info("Initializing PeriodicTaskScheduler.");
- _executorService = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
- }
+ private ScheduledExecutorService _executorService;
/**
* Start scheduling periodic tasks.
*/
public void start(List<PeriodicTask> periodicTasks) {
- if (periodicTasks == null || periodicTasks.isEmpty()) {
- LOGGER.warn("No periodic task assigned to scheduler!");
- return;
+ if (_executorService != null) {
+ LOGGER.warn("Periodic task scheduler already started");
}
- if (periodicTasks.size() > CORE_POOL_SIZE) {
- LOGGER.warn("The number of tasks:{} is more than the default number of
threads:{}.", periodicTasks.size(),
- CORE_POOL_SIZE);
+ List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+ for (PeriodicTask periodicTask : periodicTasks) {
+ if (periodicTask.getIntervalInSeconds() > 0) {
+ LOGGER.info("Adding periodic task: {}", periodicTask);
+ tasksWithValidInterval.add(periodicTask);
+ } else {
+ LOGGER.info("Skipping periodic task: {}", periodicTask);
+ }
}
- LOGGER.info("Starting PeriodicTaskScheduler.");
- // Set up an executor that executes tasks periodically
- for (PeriodicTask periodicTask : periodicTasks) {
- periodicTask.init();
- _executorService.scheduleWithFixedDelay(() -> {
- try {
- periodicTask.run();
- } catch (Throwable e) {
- // catch all errors to prevent subsequent executions from being
silently suppressed
- // Ref:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
- LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
- }
- }, periodicTask.getInitialDelayInSeconds(),
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+ if (tasksWithValidInterval.isEmpty()) {
+ LOGGER.warn("No periodic task scheduled");
+ } else {
+ LOGGER.info("Starting periodic task scheduler with tasks: {}",
tasksWithValidInterval);
+ _executorService =
Executors.newScheduledThreadPool(tasksWithValidInterval.size());
+ for (PeriodicTask periodicTask : tasksWithValidInterval) {
+ periodicTask.init();
+ _executorService.scheduleWithFixedDelay(() -> {
+ try {
+ periodicTask.run();
+ } catch (Throwable e) {
+ // catch all errors to prevent subsequent executions from being
silently suppressed
+ // Ref:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+ LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
+ }
+ }, periodicTask.getInitialDelayInSeconds(),
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+ }
}
}
public void stop() {
- LOGGER.info("Stopping PeriodicTaskScheduler");
- _executorService.shutdown();
+ if (_executorService != null) {
+ LOGGER.info("Stopping periodic task scheduler");
+ _executorService.shutdown();
+ _executorService = null;
+ }
}
}
diff --git
a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a2ff21e..a105435 100644
---
a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++
b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -16,171 +16,70 @@
package com.linkedin.pinot.core.periodictask;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.*;
-public class PeriodicTaskSchedulerTest {
-
- @Test
- public void testSchedulerWithOneTask() throws InterruptedException {
- AtomicInteger count = new AtomicInteger(0);
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- long runFrequencyInSeconds = 1L;
- long initialDelayInSeconds = 1L;
- long totalRunTimeInMilliseconds = 3_500L;
-
- List<PeriodicTask> periodicTasks = new ArrayList<>();
- PeriodicTask task = new BasePeriodicTask("Task", runFrequencyInSeconds,
initialDelayInSeconds) {
- @Override
- public void init() {
- count.set(0);
- }
-
- @Override
- public void run() {
- // Execute task.
- count.incrementAndGet();
- }
- };
- periodicTasks.add(task);
-
- long start = System.currentTimeMillis();
- periodicTaskScheduler.start(periodicTasks);
- Thread.sleep(totalRunTimeInMilliseconds);
- periodicTaskScheduler.stop();
-
- Assert.assertTrue(count.get() > 0);
- Assert.assertEquals(count.get(), (totalRunTimeInMilliseconds /
(runFrequencyInSeconds * 1000)));
- Assert.assertTrue(totalRunTimeInMilliseconds <=
(System.currentTimeMillis() - start));
- }
+public class PeriodicTaskSchedulerTest {
@Test
- public void testSchedulerWithTwoStaggeredTasks() throws InterruptedException
{
- AtomicInteger count = new AtomicInteger(0);
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- long runFrequencyInSeconds = 2L;
- long totalRunTimeInMilliseconds = 1_500L;
-
- List<PeriodicTask> periodicTasks = new ArrayList<>();
- PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds,
0L) {
- @Override
- public void init() {
- }
-
- @Override
- public void run() {
- // Execute task.
- count.incrementAndGet();
- }
- };
- periodicTasks.add(task1);
+ public void testTaskWithInvalidInterval() throws Exception {
+ AtomicBoolean initCalled = new AtomicBoolean();
+ AtomicBoolean runCalled = new AtomicBoolean();
- // Stagger 2 tasks by delaying the 2nd task half of the frequency.
- PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds,
runFrequencyInSeconds / 2) {
+ List<PeriodicTask> periodicTasks = Collections.singletonList(new
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
@Override
public void init() {
+ initCalled.set(true);
}
@Override
public void run() {
- // Execute task.
- count.decrementAndGet();
+ runCalled.set(true);
}
- };
- periodicTasks.add(task2);
+ });
- long start = System.currentTimeMillis();
- periodicTaskScheduler.start(periodicTasks);
- Thread.sleep(totalRunTimeInMilliseconds);
- periodicTaskScheduler.stop();
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.start(periodicTasks);
+ Thread.sleep(100L);
+ taskScheduler.stop();
- Assert.assertEquals(count.get(), 0);
- Assert.assertTrue(totalRunTimeInMilliseconds <=
(System.currentTimeMillis() - start));
+ assertFalse(initCalled.get());
+ assertFalse(runCalled.get());
}
@Test
- public void testSchedulerWithTwoTasksDifferentFrequencies() throws
InterruptedException {
- long startTime = System.currentTimeMillis();
- AtomicLong count = new AtomicLong(startTime);
- AtomicLong count2 = new AtomicLong(startTime);
- final long[] maxRunTimeForTask1 = {0L};
- final long[] maxRunTimeForTask2 = {0L};
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- long runFrequencyInSeconds = 1L;
- long totalRunTimeInMilliseconds = 10_000L;
-
- List<PeriodicTask> periodicTasks = new ArrayList<>();
- PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds,
0L) {
- @Override
- public void init() {
- }
-
- @Override
- public void run() {
- // Calculate the max waiting time between the same task.
- long lastTime = count.get();
- long now = System.currentTimeMillis();
- maxRunTimeForTask1[0] = Math.max(maxRunTimeForTask1[0], (now -
lastTime));
- count.set(now);
- }
- };
- periodicTasks.add(task1);
-
- // The time for Task 2 to run is 4 seconds, which is higher than the
interval time of Task 1.
- long TimeToRunMs = 4_000L;
- PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds *
3, 0L) {
- @Override
- public void init() {
- }
-
- @Override
- public void run() {
- // Calculate the max waiting time between the same task.
- long lastTime = count2.get();
- long now = System.currentTimeMillis();
- maxRunTimeForTask2[0] = Math.max(maxRunTimeForTask2[0], (now -
lastTime));
- count2.set(now);
- try {
- Thread.sleep(TimeToRunMs);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ public void testScheduleMultipleTasks() throws Exception {
+ int numTasks = 3;
+ AtomicInteger numTimesInitCalled = new AtomicInteger();
+ AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+ List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) {
+ @Override
+ public void init() {
+ numTimesInitCalled.getAndIncrement();
}
- }
- };
- periodicTasks.add(task2);
-
- periodicTaskScheduler.start(periodicTasks);
- Thread.sleep(totalRunTimeInMilliseconds);
-
- periodicTaskScheduler.stop();
- Assert.assertTrue(count.get() > startTime);
- Assert.assertTrue(count2.get() > startTime);
- // Task1 didn't waited until Task2 finished.
- Assert.assertTrue(maxRunTimeForTask1[0] - task1.getIntervalInSeconds() *
1000L < 100L);
- Assert.assertTrue(maxRunTimeForTask2[0] >=
Math.max(task2.getIntervalInSeconds() * 1000L, TimeToRunMs));
- }
-
- @Test
- public void testNoTaskAssignedToQueue() throws InterruptedException {
- AtomicInteger count = new AtomicInteger(0);
- PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
- long totalRunTimeInMilliseconds = 2_000L;
-
- // An empty list.
- List<PeriodicTask> periodicTasks = new ArrayList<>();
- long start = System.currentTimeMillis();
- periodicTaskScheduler.start(periodicTasks);
- Thread.sleep(totalRunTimeInMilliseconds);
+ @Override
+ public void run() {
+ numTimesRunCalled.getAndIncrement();
+ }
+ });
+ }
- periodicTaskScheduler.stop();
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.start(periodicTasks);
+ Thread.sleep(1100L);
+ taskScheduler.stop();
- Assert.assertEquals(count.get(), 0);
- Assert.assertTrue(totalRunTimeInMilliseconds <=
(System.currentTimeMillis() - start));
+ assertEquals(numTimesInitCalled.get(), numTasks);
+ assertEquals(numTimesRunCalled.get(), numTasks * 2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]