This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 162dc2951 Change tagged job assignment behavior (#2976)
162dc2951 is described below

commit 162dc29514d713d7eca9325563c39a75e6c8c080
Author: Junkai Xue <j...@apache.org>
AuthorDate: Fri Dec 6 13:49:15 2024 -0800

    Change tagged job assignment behavior (#2976)
    
    Once tag removed from instance for assignment purpose, current behavior is:
    1) no new tagged tasks assigns to that instance.
    2) drop the existing running tasks.
    
    Change behavior for this:
    1) keep no new tagged tasks assigns to that instance.
    2) let existing tasks running to complete
    
    Test added.
    
    Co-authored-by: Junkai <j...@pinterest.com>
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 10 ++-
 .../java/org/apache/helix/task/JobDispatcher.java  |  3 +-
 .../org/apache/helix/task/TestJobTagRemoval.java   | 76 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 3 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 819ee5d72..5d3364803 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -825,12 +825,18 @@ public abstract class AbstractTaskDispatcher {
           // disabled. If instance is disabled and current state still exist 
on the instance,
           // then controller needs to drop the current state, otherwise, the 
task can be marked as
           // dropped and be reassigned to other instances
-          if (disableInstances.contains(assignedParticipant)
-              && currStateOutput.getCurrentState(jobResource, new 
Partition(partitionName),
+          if (disableInstances.contains(assignedParticipant) &&
+              currStateOutput.getCurrentState(jobResource, new 
Partition(partitionName),
                   assignedParticipant) != null) {
             paMap.put(partitionNumber,
                 new PartitionAssignment(assignedParticipant, 
TaskPartitionState.DROPPED.name()));
           } else {
+            if (_manager.getHelixDataAccessor().getProperty(
+                
_manager.getHelixDataAccessor().keyBuilder().liveInstance(assignedParticipant))
+                != null && !disableInstances.contains(assignedParticipant)) {
+              continue;
+            }
+            // Only drop the task if the instance is not alive, otherwise, the 
task will be continued
             jobContext.setPartitionState(partitionNumber, 
TaskPartitionState.DROPPED);
             filteredTasks.add(partitionNumber);
           }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 6d4c687fc..d76eff7cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -240,7 +240,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
     Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
-        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, 
jobResource, tasksToDrop);
+        getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(), 
currStateOutput,
+            jobResource, tasksToDrop);
 
     updateInstanceToTaskAssignmentsFromContext(jobCtx, 
currentInstanceToTaskAssignments);
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java 
b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java
new file mode 100644
index 000000000..788df9031
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java
@@ -0,0 +1,76 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJobTagRemoval extends TaskTestBase {
+
+  @Test
+  public void testJobTagRemoval() throws InterruptedException {
+    String TEST_TAG = "testTag";
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    InstanceConfig cfg =
+        configAccessor.getInstanceConfig(CLUSTER_NAME, 
_participants[0].getInstanceName());
+    cfg.addTag(TEST_TAG);
+    configAccessor.setInstanceConfig(CLUSTER_NAME, 
_participants[0].getInstanceName(), cfg);
+
+    Workflow.Builder builder = new Workflow.Builder("testWorkflow");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    taskConfigs.add(
+        new 
TaskConfig.Builder().setTaskId("tagged_task").setCommand(MockTask.TASK_COMMAND)
+            .build());
+    JobConfig.Builder jobBuilder = new 
JobConfig.Builder().addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "5000"))
+        .setInstanceGroupTag(TEST_TAG);
+    JobConfig.Builder jobBuilder1 = new 
JobConfig.Builder().addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"))
+        .setInstanceGroupTag(TEST_TAG);
+    builder.addJob("JOB0", jobBuilder);
+    builder.addJob("JOB1", jobBuilder1);
+    builder.addParentChildDependency("JOB0", "JOB1");
+    _driver.start(builder.build());
+
+    // Wait for the job to be created
+    _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", 
TaskState.IN_PROGRESS);
+
+    // Remove the tag
+    cfg = configAccessor.getInstanceConfig(CLUSTER_NAME, 
_participants[0].getInstanceName());
+    cfg.removeTag(TEST_TAG);
+    configAccessor.setInstanceConfig(CLUSTER_NAME, 
_participants[0].getInstanceName(), cfg);
+
+    // Wait for the job to complete
+    _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", 
TaskState.COMPLETED);
+    _driver.pollForJobState("testWorkflow", "testWorkflow_JOB1", 
TaskState.IN_PROGRESS);
+    JobContext ctx = _driver.getJobContext("testWorkflow_JOB1");
+    Assert.assertEquals(ctx.getPartitionState(0), null);
+  }
+}

Reply via email to