This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new 162dc2951 Change tagged job assignment behavior (#2976) 162dc2951 is described below commit 162dc29514d713d7eca9325563c39a75e6c8c080 Author: Junkai Xue <j...@apache.org> AuthorDate: Fri Dec 6 13:49:15 2024 -0800 Change tagged job assignment behavior (#2976) Once tag removed from instance for assignment purpose, current behavior is: 1) no new tagged tasks assigns to that instance. 2) drop the existing running tasks. Change behavior for this: 1) keep no new tagged tasks assigns to that instance. 2) let existing tasks running to complete Test added. Co-authored-by: Junkai <j...@pinterest.com> --- .../apache/helix/task/AbstractTaskDispatcher.java | 10 ++- .../java/org/apache/helix/task/JobDispatcher.java | 3 +- .../org/apache/helix/task/TestJobTagRemoval.java | 76 ++++++++++++++++++++++ 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 819ee5d72..5d3364803 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -825,12 +825,18 @@ public abstract class AbstractTaskDispatcher { // disabled. If instance is disabled and current state still exist on the instance, // then controller needs to drop the current state, otherwise, the task can be marked as // dropped and be reassigned to other instances - if (disableInstances.contains(assignedParticipant) - && currStateOutput.getCurrentState(jobResource, new Partition(partitionName), + if (disableInstances.contains(assignedParticipant) && + currStateOutput.getCurrentState(jobResource, new Partition(partitionName), assignedParticipant) != null) { paMap.put(partitionNumber, new PartitionAssignment(assignedParticipant, TaskPartitionState.DROPPED.name())); } else { + if (_manager.getHelixDataAccessor().getProperty( + _manager.getHelixDataAccessor().keyBuilder().liveInstance(assignedParticipant)) + != null && !disableInstances.contains(assignedParticipant)) { + continue; + } + // Only drop the task if the instance is not alive, otherwise, the task will be continued jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED); filteredTasks.add(partitionNumber); } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 6d4c687fc..d76eff7cd 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -240,7 +240,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { Map<String, Set<Integer>> tasksToDrop = new HashMap<>(); Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments = - getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop); + getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(), currStateOutput, + jobResource, tasksToDrop); updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java new file mode 100644 index 000000000..788df9031 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestJobTagRemoval.java @@ -0,0 +1,76 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.model.InstanceConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestJobTagRemoval extends TaskTestBase { + + @Test + public void testJobTagRemoval() throws InterruptedException { + String TEST_TAG = "testTag"; + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + InstanceConfig cfg = + configAccessor.getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName()); + cfg.addTag(TEST_TAG); + configAccessor.setInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName(), cfg); + + Workflow.Builder builder = new Workflow.Builder("testWorkflow"); + List<TaskConfig> taskConfigs = new ArrayList<>(); + taskConfigs.add( + new TaskConfig.Builder().setTaskId("tagged_task").setCommand(MockTask.TASK_COMMAND) + .build()); + JobConfig.Builder jobBuilder = new JobConfig.Builder().addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "5000")) + .setInstanceGroupTag(TEST_TAG); + JobConfig.Builder jobBuilder1 = new JobConfig.Builder().addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000")) + .setInstanceGroupTag(TEST_TAG); + builder.addJob("JOB0", jobBuilder); + builder.addJob("JOB1", jobBuilder1); + builder.addParentChildDependency("JOB0", "JOB1"); + _driver.start(builder.build()); + + // Wait for the job to be created + _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", TaskState.IN_PROGRESS); + + // Remove the tag + cfg = configAccessor.getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName()); + cfg.removeTag(TEST_TAG); + configAccessor.setInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName(), cfg); + + // Wait for the job to complete + _driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", TaskState.COMPLETED); + _driver.pollForJobState("testWorkflow", "testWorkflow_JOB1", TaskState.IN_PROGRESS); + JobContext ctx = _driver.getJobContext("testWorkflow_JOB1"); + Assert.assertEquals(ctx.getPartitionState(0), null); + } +}