This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a1565c Allow configuring minion task timeout in the
PinotTaskGenerator (#5317)
6a1565c is described below
commit 6a1565cd053016d983cb0685b63bd93d55416507
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Apr 29 20:29:46 2020 -0700
Allow configuring minion task timeout in the PinotTaskGenerator (#5317)
The default task timeout is set to 1 hour. For certain expensive tasks,
this might not be enough.
Making it configurable for each task type
---
.../core/minion/PinotHelixTaskResourceManager.java | 13 +++++++-----
.../helix/core/minion/PinotTaskManager.java | 3 ++-
.../generator/ConvertToRawIndexTaskGenerator.java | 16 ++-------------
.../core/minion/generator/PinotTaskGenerator.java | 24 ++++++++++++++--------
.../tests/SimpleMinionClusterIntegrationTest.java | 13 +-----------
5 files changed, 29 insertions(+), 40 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 5dadef9..096176b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -174,11 +174,13 @@ public class PinotHelixTaskResourceManager {
* Submit a list of child tasks with same task type to the Minion instances
with the default tag.
*
* @param pinotTaskConfigs List of child task configs to be submitted
+ * @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks
allowed per instance
* @return Name of the submitted parent task
*/
- public synchronized String submitTask(List<PinotTaskConfig>
pinotTaskConfigs, int numConcurrentTasksPerInstance) {
- return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE,
numConcurrentTasksPerInstance);
+ public synchronized String submitTask(List<PinotTaskConfig>
pinotTaskConfigs, long taskTimeoutMs,
+ int numConcurrentTasksPerInstance) {
+ return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE,
taskTimeoutMs, numConcurrentTasksPerInstance);
}
/**
@@ -186,11 +188,12 @@ public class PinotHelixTaskResourceManager {
*
* @param pinotTaskConfigs List of child task configs to be submitted
* @param minionInstanceTag Tag of the Minion instances to submit the task to
+ * @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks
allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(List<PinotTaskConfig>
pinotTaskConfigs, String minionInstanceTag,
- int numConcurrentTasksPerInstance) {
+ long taskTimeoutMs, int numConcurrentTasksPerInstance) {
int numChildTasks = pinotTaskConfigs.size();
Preconditions.checkState(numChildTasks > 0);
Preconditions.checkState(numConcurrentTasksPerInstance > 0);
@@ -212,8 +215,8 @@ public class PinotHelixTaskResourceManager {
// don't want one task failure affects other tasks. Also, if one task
failed, next time we will re-schedule it
JobConfig.Builder jobBuilder =
new
JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
-
.setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance).setIgnoreDependentJobFailure(true)
- .setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
+
.setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
+
.setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
_taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName,
jobBuilder);
// Wait until task state is available
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 866dd2b..93422f7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -144,7 +144,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
LOGGER
.info("Submitting {} tasks for task type: {} with task configs:
{}", numTasks, taskType, pinotTaskConfigs);
tasksScheduled.put(taskType, _helixTaskResourceManager
- .submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
+ .submitTask(pinotTaskConfigs,
pinotTaskGenerator.getTaskTimeoutMs(),
+ pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
_controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index c81e1e6..dc4acf7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
@@ -46,15 +45,13 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
_clusterInfoProvider = clusterInfoProvider;
}
- @Nonnull
@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
}
- @Nonnull
@Override
- public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig>
tableConfigs) {
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
// Get the segments that are being converted so that we don't submit them
again
@@ -80,7 +77,7 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
- tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = Integer.MAX_VALUE;
}
@@ -127,13 +124,4 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
return pinotTaskConfigs;
}
-
- @Override
- public int getNumConcurrentTasksPerInstance() {
- return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- }
-
- @Override
- public void nonLeaderCleanUp() {
- }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 823d6aa..6fe55ee 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -19,7 +19,6 @@
package org.apache.pinot.controller.helix.core.minion.generator;
import java.util.List;
-import javax.annotation.Nonnull;
import org.apache.helix.task.JobConfig;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -29,14 +28,12 @@ import org.apache.pinot.spi.config.table.TableConfig;
* The interface <code>PinotTaskGenerator</code> defines the APIs for task
generators.
*/
public interface PinotTaskGenerator {
- int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE =
JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
/**
* Returns the task type of the generator.
*
* @return Task type of the generator
*/
- @Nonnull
String getTaskType();
/**
@@ -44,18 +41,29 @@ public interface PinotTaskGenerator {
*
* @return List of tasks to schedule
*/
- @Nonnull
- List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs);
+ List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
/**
- * Returns the maximum number of concurrent tasks allowed per instance.
+ * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by
default.
+ *
+ * @return Timeout in milliseconds for each task.
+ */
+ default long getTaskTimeoutMs() {
+ return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+ }
+
+ /**
+ * Returns the maximum number of concurrent tasks allowed per instance, 1 by
default.
*
* @return Maximum number of concurrent tasks allowed per instance
*/
- int getNumConcurrentTasksPerInstance();
+ default int getNumConcurrentTasksPerInstance() {
+ return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ }
/**
* Performs necessary cleanups (e.g. remove metrics) when the controller
leadership changes.
*/
- void nonLeaderCleanUp();
+ default void nonLeaderCleanUp() {
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index fd4b403..fd4ea11 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -205,15 +205,13 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
_clusterInfoProvider = clusterInfoProvider;
}
- @Nonnull
@Override
public String getTaskType() {
return TASK_TYPE;
}
- @Nonnull
@Override
- public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig>
tableConfigs) {
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
{
assertEquals(tableConfigs.size(), 2);
// Generate at most 2 tasks
@@ -230,15 +228,6 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
}
return taskConfigs;
}
-
- @Override
- public int getNumConcurrentTasksPerInstance() {
- return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- }
-
- @Override
- public void nonLeaderCleanUp() {
- }
}
public static class TestTaskExecutorFactory implements
PinotTaskExecutorFactory {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]