This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f9f89a79768156ef7341262cbb25a40d7dafeb1e
Author: narendly <naren...@gmail.com>
AuthorDate: Mon Feb 25 17:51:17 2019 -0800

    [HELIX-795] TASK: Drop tasks upon Participant reconnect
    
    This changes the default reset() behavior for tasks on Participants. 
Previously, it would send all task partitions to INIT. After this change, the 
task partitions will inherit the states from the previous session, and their 
RequestedState will be set to DROPPED. Then the Controller will send messages 
to drop the said task partitions so that there are no quota/resource leaks for 
the number of tasks on Participants.
    Changelist:
    1. Modify state transition logic so that drop state transitions messages 
will be honored
    2. Modify CurrentState copy-over logic
    3. Add an integration test: TestDropOnParticipantReset
---
 .../helix/manager/zk/CurStateCarryOverUpdater.java | 18 +++-
 .../helix/participant/HelixStateMachineEngine.java | 16 +++-
 .../java/org/apache/helix/task/JobDispatcher.java  | 45 +++++++---
 .../java/org/apache/helix/task/TaskStateModel.java | 13 ++-
 .../task/TestDropOnParticipantReset.java           | 95 ++++++++++++++++++++++
 5 files changed, 167 insertions(+), 20 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
index b96de18..f52a669 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
@@ -22,6 +22,9 @@ package org.apache.helix.manager.zk;
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
+
 
 /**
  * updater for carrying over last current states
@@ -47,7 +50,7 @@ class CurStateCarryOverUpdater implements 
DataUpdater<ZNRecord> {
 
   @Override
   public ZNRecord update(ZNRecord currentData) {
-    CurrentState curState = null;
+    CurrentState curState;
     if (currentData == null) {
       curState = new CurrentState(_lastCurState.getId());
       // copy all simple fields settings and overwrite session-id to current 
session
@@ -58,9 +61,16 @@ class CurStateCarryOverUpdater implements 
DataUpdater<ZNRecord> {
     }
 
     for (String partitionName : _lastCurState.getPartitionStateMap().keySet()) 
{
-      // carry-over only when current-state not exist
-      if (curState.getState(partitionName) == null) {
-        curState.setState(partitionName, _initState);
+      // For tasks, we preserve previous session's CurrentStates and set 
RequestState to DROPPED so
+      // that they will be dropped by the Controller
+      if 
(_lastCurState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+        curState.setState(partitionName, 
_lastCurState.getState(partitionName));
+        curState.setRequestedState(partitionName, 
TaskPartitionState.DROPPED.name());
+      } else {
+        // carry-over only when current-state does not exist for regular Helix 
resource partitions
+        if (curState.getState(partitionName) == null) {
+          curState.setState(partitionName, _initState);
+        }
       }
     }
     return curState.getRecord();
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index e235201..c75b0b8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -45,6 +45,8 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,14 +217,22 @@ public class HelixStateMachineEngine implements 
StateMachineEngine {
       _stateModelDefs.put(stateModelName, stateModelDef);
     }
 
-    if (message.getBatchMessageMode() == false) {
+    if (!message.getBatchMessageMode()) {
       String initState = 
_stateModelDefs.get(message.getStateModelDef()).getInitialState();
       StateModel stateModel = stateModelFactory.getStateModel(resourceName, 
partitionKey);
       if (stateModel == null) {
         stateModel = stateModelFactory.createAndAddStateModel(resourceName, 
partitionKey);
-        stateModel.updateState(initState);
+        if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME)
+            && message.getToState().equals(TaskPartitionState.DROPPED.name())) 
{
+          // If stateModel is null, that means there was a reboot of the 
Participant. Then the
+          // purpose of this first message must be to drop the task. We 
manually set the current
+          // state to be the same state of fromState (which Controller 
inferred from JobContext) to
+          // allow the Participant to successfully process this dropping 
transition
+          stateModel.updateState(message.getFromState());
+        } else {
+          stateModel.updateState(initState);
+        }
       }
-
       if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) 
{
         return new HelixStateTransitionCancellationHandler(stateModel, 
message, context);
       } else {
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 408c9f1..d72db7f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -25,11 +25,11 @@ import org.slf4j.LoggerFactory;
 public class JobDispatcher extends AbstractTaskDispatcher {
   private static final Logger LOG = 
LoggerFactory.getLogger(JobDispatcher.class);
 
-  private static final Set<TaskState> intemediateStates = new HashSet<>(
-      Arrays.asList(TaskState.IN_PROGRESS, TaskState.NOT_STARTED, 
TaskState.STOPPING, TaskState.STOPPED));
+  // Intermediate states (meaning they are not terminal states) for workflows 
and jobs
+  private static final Set<TaskState> INTERMEDIATE_STATES = new 
HashSet<>(Arrays
+      .asList(TaskState.IN_PROGRESS, TaskState.NOT_STARTED, 
TaskState.STOPPING, TaskState.STOPPED));
   private WorkflowControllerDataProvider _dataProvider;
 
-
   public void updateCache(WorkflowControllerDataProvider cache) {
     _dataProvider = cache;
   }
@@ -127,7 +127,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     jobState = workflowCtx.getJobState(jobName);
     workflowState = workflowCtx.getWorkflowState();
 
-    if (intemediateStates.contains(jobState) && 
(isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
+    if (INTERMEDIATE_STATES.contains(jobState) && 
(isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
         || TaskState.TIMED_OUT.equals(workflowState))) {
       jobState = TaskState.TIMING_OUT;
       workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
@@ -206,7 +206,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
         getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment,
-            allPartitions);
+            allPartitions, currStateOutput, jobResource);
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
@@ -365,28 +365,53 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
    * @param liveInstances
    * @param prevAssignment task partition -> (instance -> state)
    * @param allTaskPartitions all task partitionIds
+   * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
+   *          are accounted for
+   * @param jobName job name
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
   private static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
       Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, 
SortedSet<Integer>>();
+      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName) {
+    Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<Integer>());
     }
 
+    // First, add all task partitions from JobContext
     for (Partition partition : prevAssignment.getMappedPartitions()) {
       int pId = TaskUtil.getPartitionId(partition.getPartitionName());
       if (allTaskPartitions.contains(pId)) {
         Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
         for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
+          SortedSet<Integer> pIdSet = result.get(instance);
+          if (pIdSet != null) {
+            pIdSet.add(pId);
           }
         }
       }
     }
+
+    // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
+    // from previous sessions won't show up in prevInstanceToTaskAssignments
+    // We need to add these back here in order for these task partitions to be 
dropped (after a
+    // copy-over, the Controller will send a message to drop the state 
currentState)
+    // partitions: (partition -> instance -> currentState)
+    Map<Partition, Map<String, String>> partitions = 
currStateOutput.getCurrentStateMap(jobName);
+    for (Map.Entry<Partition, Map<String, String>> entry : 
partitions.entrySet()) {
+      // Get all (instance -> currentState) mappings
+      for (Map.Entry<String, String> instanceToCurrState : 
entry.getValue().entrySet()) {
+        String instance = instanceToCurrState.getKey();
+        String requestedState =
+            currStateOutput.getRequestedState(jobName, entry.getKey(), 
instance);
+        int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
+        if (result.containsKey(instance) && requestedState != null
+            && requestedState.equals(TaskPartitionState.DROPPED.name())) {
+          // Only if this instance is live and requestedState is DROPPED
+          result.get(instance).add(pId);
+        }
+      }
+    }
     return result;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 90ab2fb..34d6842 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -21,7 +21,9 @@ package org.apache.helix.task;
 
 import java.util.Map;
 import java.util.TimerTask;
-import java.util.concurrent.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
@@ -189,8 +191,13 @@ public class TaskStateModel extends StateModel {
   public void onBecomeDroppedFromRunning(Message msg, NotificationContext 
context) {
     String taskPartition = msg.getPartitionName();
     if (_taskRunner == null) {
-      throw new IllegalStateException(String.format(
-          "Invalid state transition. There is no running task for partition 
%s.", taskPartition));
+      if (timeout_task != null) {
+        timeout_task.cancel(true);
+      }
+      LOG.error(
+          "The thread running the task partition {} was not found while 
attempting to cancel this task; Manual cleanup may be required for this task.",
+          taskPartition);
+      return;
     }
 
     _taskRunner.cancel();
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java
new file mode 100644
index 0000000..5dbde94
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropOnParticipantReset.java
@@ -0,0 +1,95 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDropOnParticipantReset extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 0;
+    _numPartitions = 0;
+    _numReplicas = 0;
+    _numNodes = 1; // Only bring up 1 instance
+    super.beforeClass();
+  }
+
+  /**
+   * Tests that upon Participant reconnect, the Controller correctly sends the 
current
+   * state of the task partition to DROPPED. This is to avoid a resource leak 
in case of a
+   * Participant disconnect/reconnect and an ensuing reset() on all of the 
partitions on that
+   * Participant.
+   */
+  @Test
+  public void testDropOnParticipantReset() throws InterruptedException {
+    // Create a workflow with some long-running jobs in progress
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB";
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    for (int j = 0; j < 2; j++) { // 2 tasks to ensure that they execute
+      String taskID = jobName + "_TASK_" + j;
+      TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+      taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND)
+          .addConfig(MockTask.JOB_DELAY, "3000");
+      taskConfigs.add(taskConfigBuilder.build());
+    }
+    JobConfig.Builder jobBuilder = new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        
.setMaxAttemptsPerTask(10).setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+        .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true)
+        // 1 task at a time
+        .setNumConcurrentTasksPerInstance(1);
+    builder.addJob(jobName, jobBuilder);
+
+    // Modify maxConcurrentTask for the instance so that it only accepts 1 
task at most
+    InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName());
+    instanceConfig.setMaxConcurrentTask(1);
+    _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME,
+        _participants[0].getInstanceName(), instanceConfig);
+
+    // Start the workflow
+    _driver.start(builder.build());
+    _driver.pollForJobState(workflowName, workflowName + "_" + jobName, 
TaskState.IN_PROGRESS);
+    Thread.sleep(1500L); // Wait for the Participant to process the message
+    // Stop and start the participant to mimic a connection issue
+    _participants[0].syncStop();
+    // Upon starting the participant, the first task partition should be 
dropped and assigned anew
+    // on the instance. Then the rest of the tasks will execute and the 
workflow will complete
+    startParticipant(0);
+
+    TaskState workflowState = _driver.pollForWorkflowState(workflowName, 
TaskState.COMPLETED);
+    TaskState jobState =
+        _driver.pollForJobState(workflowName, workflowName + "_" + jobName, 
TaskState.COMPLETED);
+    Assert.assertEquals(workflowState, TaskState.COMPLETED);
+    Assert.assertEquals(jobState, TaskState.COMPLETED);
+  }
+}

Reply via email to