This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 162dc2951 Change tagged job assignment behavior (#2976)
162dc2951 is described below
commit 162dc29514d713d7eca9325563c39a75e6c8c080
Author: Junkai Xue <[email protected]>
AuthorDate: Fri Dec 6 13:49:15 2024 -0800
Change tagged job assignment behavior (#2976)
Once tag removed from instance for assignment purpose, current behavior is:
1) no new tagged tasks assigns to that instance.
2) drop the existing running tasks.
Change behavior for this:
1) keep no new tagged tasks assigns to that instance.
2) let existing tasks running to complete
Test added.
Co-authored-by: Junkai <[email protected]>
---
.../apache/helix/task/AbstractTaskDispatcher.java | 10 ++-
.../java/org/apache/helix/task/JobDispatcher.java | 3 +-
.../org/apache/helix/task/TestJobTagRemoval.java | 76 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 3 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 819ee5d72..5d3364803 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -825,12 +825,18 @@ public abstract class AbstractTaskDispatcher {
// disabled. If instance is disabled and current state still exist
on the instance,
// then controller needs to drop the current state, otherwise, the
task can be marked as
// dropped and be reassigned to other instances
- if (disableInstances.contains(assignedParticipant)
- && currStateOutput.getCurrentState(jobResource, new
Partition(partitionName),
+ if (disableInstances.contains(assignedParticipant) &&
+ currStateOutput.getCurrentState(jobResource, new
Partition(partitionName),
assignedParticipant) != null) {
paMap.put(partitionNumber,
new PartitionAssignment(assignedParticipant,
TaskPartitionState.DROPPED.name()));
} else {
+ if (_manager.getHelixDataAccessor().getProperty(
+
_manager.getHelixDataAccessor().keyBuilder().liveInstance(assignedParticipant))
+ != null && !disableInstances.contains(assignedParticipant)) {
+ continue;
+ }
+ // Only drop the task if the instance is not alive, otherwise, the
task will be continued
jobContext.setPartitionState(partitionNumber,
TaskPartitionState.DROPPED);
filteredTasks.add(partitionNumber);
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 6d4c687fc..d76eff7cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -240,7 +240,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
- getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput,
jobResource, tasksToDrop);
+ getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(),
currStateOutput,
+ jobResource, tasksToDrop);
updateInstanceToTaskAssignmentsFromContext(jobCtx,
currentInstanceToTaskAssignments);
diff --git
a/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java
b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java
new file mode 100644
index 000000000..788df9031
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java
@@ -0,0 +1,76 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJobTagRemoval extends TaskTestBase {
+
+ @Test
+ public void testJobTagRemoval() throws InterruptedException {
+ String TEST_TAG = "testTag";
+
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ InstanceConfig cfg =
+ configAccessor.getInstanceConfig(CLUSTER_NAME,
_participants[0].getInstanceName());
+ cfg.addTag(TEST_TAG);
+ configAccessor.setInstanceConfig(CLUSTER_NAME,
_participants[0].getInstanceName(), cfg);
+
+ Workflow.Builder builder = new Workflow.Builder("testWorkflow");
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(
+ new
TaskConfig.Builder().setTaskId("tagged_task").setCommand(MockTask.TASK_COMMAND)
+ .build());
+ JobConfig.Builder jobBuilder = new
JobConfig.Builder().addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "5000"))
+ .setInstanceGroupTag(TEST_TAG);
+ JobConfig.Builder jobBuilder1 = new
JobConfig.Builder().addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"))
+ .setInstanceGroupTag(TEST_TAG);
+ builder.addJob("JOB0", jobBuilder);
+ builder.addJob("JOB1", jobBuilder1);
+ builder.addParentChildDependency("JOB0", "JOB1");
+ _driver.start(builder.build());
+
+ // Wait for the job to be created
+ _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0",
TaskState.IN_PROGRESS);
+
+ // Remove the tag
+ cfg = configAccessor.getInstanceConfig(CLUSTER_NAME,
_participants[0].getInstanceName());
+ cfg.removeTag(TEST_TAG);
+ configAccessor.setInstanceConfig(CLUSTER_NAME,
_participants[0].getInstanceName(), cfg);
+
+ // Wait for the job to complete
+ _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0",
TaskState.COMPLETED);
+ _driver.pollForJobState("testWorkflow", "testWorkflow_JOB1",
TaskState.IN_PROGRESS);
+ JobContext ctx = _driver.getJobContext("testWorkflow_JOB1");
+ Assert.assertEquals(ctx.getPartitionState(0), null);
+ }
+}