This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1565c  Allow configuring minion task timeout in the 
PinotTaskGenerator (#5317)
6a1565c is described below

commit 6a1565cd053016d983cb0685b63bd93d55416507
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Apr 29 20:29:46 2020 -0700

    Allow configuring minion task timeout in the PinotTaskGenerator (#5317)
    
    The default task timeout is set to 1 hour. For certain expensive tasks, 
this might not be enough.
    Making it configurable for each task type
---
 .../core/minion/PinotHelixTaskResourceManager.java | 13 +++++++-----
 .../helix/core/minion/PinotTaskManager.java        |  3 ++-
 .../generator/ConvertToRawIndexTaskGenerator.java  | 16 ++-------------
 .../core/minion/generator/PinotTaskGenerator.java  | 24 ++++++++++++++--------
 .../tests/SimpleMinionClusterIntegrationTest.java  | 13 +-----------
 5 files changed, 29 insertions(+), 40 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 5dadef9..096176b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -174,11 +174,13 @@ public class PinotHelixTaskResourceManager {
    * Submit a list of child tasks with same task type to the Minion instances 
with the default tag.
    *
    * @param pinotTaskConfigs List of child task configs to be submitted
+   * @param taskTimeoutMs Timeout in milliseconds for each task
    * @param numConcurrentTasksPerInstance Maximum number of concurrent tasks 
allowed per instance
    * @return Name of the submitted parent task
    */
-  public synchronized String submitTask(List<PinotTaskConfig> 
pinotTaskConfigs, int numConcurrentTasksPerInstance) {
-    return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, 
numConcurrentTasksPerInstance);
+  public synchronized String submitTask(List<PinotTaskConfig> 
pinotTaskConfigs, long taskTimeoutMs,
+      int numConcurrentTasksPerInstance) {
+    return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, 
taskTimeoutMs, numConcurrentTasksPerInstance);
   }
 
   /**
@@ -186,11 +188,12 @@ public class PinotHelixTaskResourceManager {
    *
    * @param pinotTaskConfigs List of child task configs to be submitted
    * @param minionInstanceTag Tag of the Minion instances to submit the task to
+   * @param taskTimeoutMs Timeout in milliseconds for each task
    * @param numConcurrentTasksPerInstance Maximum number of concurrent tasks 
allowed per instance
    * @return Name of the submitted parent task
    */
   public synchronized String submitTask(List<PinotTaskConfig> 
pinotTaskConfigs, String minionInstanceTag,
-      int numConcurrentTasksPerInstance) {
+      long taskTimeoutMs, int numConcurrentTasksPerInstance) {
     int numChildTasks = pinotTaskConfigs.size();
     Preconditions.checkState(numChildTasks > 0);
     Preconditions.checkState(numConcurrentTasksPerInstance > 0);
@@ -212,8 +215,8 @@ public class PinotHelixTaskResourceManager {
     // don't want one task failure affects other tasks. Also, if one task 
failed, next time we will re-schedule it
     JobConfig.Builder jobBuilder =
         new 
JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag)
-            
.setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance).setIgnoreDependentJobFailure(true)
-            .setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
+            
.setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance)
+            
.setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE);
     _taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, 
jobBuilder);
 
     // Wait until task state is available
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 866dd2b..93422f7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -144,7 +144,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
         LOGGER
             .info("Submitting {} tasks for task type: {} with task configs: 
{}", numTasks, taskType, pinotTaskConfigs);
         tasksScheduled.put(taskType, _helixTaskResourceManager
-            .submitTask(pinotTaskConfigs, 
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
+            .submitTask(pinotTaskConfigs, 
pinotTaskGenerator.getTaskTimeoutMs(),
+                pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
         _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index c81e1e6..dc4acf7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.Segment;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
@@ -46,15 +45,13 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
     _clusterInfoProvider = clusterInfoProvider;
   }
 
-  @Nonnull
   @Override
   public String getTaskType() {
     return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
   }
 
-  @Nonnull
   @Override
-  public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> 
tableConfigs) {
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
 
     // Get the segments that are being converted so that we don't submit them 
again
@@ -80,7 +77,7 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
       String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
       if (tableMaxNumTasksConfig != null) {
         try {
-          tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
+          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
         } catch (Exception e) {
           tableMaxNumTasks = Integer.MAX_VALUE;
         }
@@ -127,13 +124,4 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
 
     return pinotTaskConfigs;
   }
-
-  @Override
-  public int getNumConcurrentTasksPerInstance() {
-    return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-  }
-
-  @Override
-  public void nonLeaderCleanUp() {
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 823d6aa..6fe55ee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.minion.generator;
 
 import java.util.List;
-import javax.annotation.Nonnull;
 import org.apache.helix.task.JobConfig;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -29,14 +28,12 @@ import org.apache.pinot.spi.config.table.TableConfig;
  * The interface <code>PinotTaskGenerator</code> defines the APIs for task 
generators.
  */
 public interface PinotTaskGenerator {
-  int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 
JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
 
   /**
    * Returns the task type of the generator.
    *
    * @return Task type of the generator
    */
-  @Nonnull
   String getTaskType();
 
   /**
@@ -44,18 +41,29 @@ public interface PinotTaskGenerator {
    *
    * @return List of tasks to schedule
    */
-  @Nonnull
-  List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs);
+  List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
   /**
-   * Returns the maximum number of concurrent tasks allowed per instance.
+   * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by 
default.
+   *
+   * @return Timeout in milliseconds for each task.
+   */
+  default long getTaskTimeoutMs() {
+    return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+  }
+
+  /**
+   * Returns the maximum number of concurrent tasks allowed per instance, 1 by 
default.
    *
    * @return Maximum number of concurrent tasks allowed per instance
    */
-  int getNumConcurrentTasksPerInstance();
+  default int getNumConcurrentTasksPerInstance() {
+    return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
 
   /**
    * Performs necessary cleanups (e.g. remove metrics) when the controller 
leadership changes.
    */
-  void nonLeaderCleanUp();
+  default void nonLeaderCleanUp() {
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index fd4b403..fd4ea11 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -205,15 +205,13 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       _clusterInfoProvider = clusterInfoProvider;
     }
 
-    @Nonnull
     @Override
     public String getTaskType() {
       return TASK_TYPE;
     }
 
-    @Nonnull
     @Override
-    public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> 
tableConfigs) {
+    public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) 
{
       assertEquals(tableConfigs.size(), 2);
 
       // Generate at most 2 tasks
@@ -230,15 +228,6 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       }
       return taskConfigs;
     }
-
-    @Override
-    public int getNumConcurrentTasksPerInstance() {
-      return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-    }
-
-    @Override
-    public void nonLeaderCleanUp() {
-    }
   }
 
   public static class TestTaskExecutorFactory implements 
PinotTaskExecutorFactory {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to