This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 361e18da4152c0146daa9d9dc7929f1f2bdcd9dc Author: Neal Sun <[email protected]> AuthorDate: Tue Dec 15 14:26:53 2020 -0800 Participant-side Task Current State Migration (#1584) The second part of the task current state migration. All changes made in this commit are on the participant side. --- .../java/org/apache/helix/SystemPropertyKeys.java | 3 + .../handling/HelixStateTransitionHandler.java | 27 ++++-- .../messaging/handling/HelixTaskExecutor.java | 15 +++- .../java/org/apache/helix/task/TaskRunner.java | 6 +- .../TestRoutingTableProviderFromCurrentStates.java | 95 ++++++++++++++++++++ .../task/TestCurrentStateDropWithoutConfigs.java | 4 +- .../task/TestDropCurrentStateRunningTask.java | 12 +-- .../integration/task/TestTaskCurrentStateDrop.java | 16 ++-- .../integration/task/TestTaskCurrentStateNull.java | 8 +- .../task/TestTaskCurrentStatePathDisabled.java | 100 +++++++++++++++++++++ .../task/TestTaskSchedulingTwoCurrentStates.java | 8 +- 11 files changed, 259 insertions(+), 35 deletions(-) diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java index 6a73a7e..4fe651c 100644 --- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java +++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java @@ -84,4 +84,7 @@ public class SystemPropertyKeys { MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY; public static final String STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED = "helix.StateUpdateUtil.errorLog.enabled"; + + public static final String TASK_CURRENT_STATE_PATH_DISABLED = + "helix.taskCurrentStatePathDisabled"; } diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index bdb0d0b..d32e90e 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -38,6 +38,7 @@ import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Message; import org.apache.helix.model.Message.Attributes; @@ -45,6 +46,7 @@ import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelParser; import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.task.TaskStateModel; import org.apache.helix.util.StatusUpdateUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer; @@ -86,6 +88,8 @@ public class HelixStateTransitionHandler extends MessageHandler { private final CurrentState _currentStateDelta; private final HelixManager _manager; private final StateModelFactory<? extends StateModel> _stateModelFactory; + private final boolean _isTaskMessage; + private final boolean _isTaskCurrentStatePathDisabled; volatile boolean _isTimeout = false; public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory, @@ -98,6 +102,9 @@ public class HelixStateTransitionHandler extends MessageHandler { _currentStateDelta = currentStateDelta; _manager = _notificationContext.getManager(); _stateModelFactory = stateModelFactory; + _isTaskMessage = stateModel instanceof TaskStateModel; + _isTaskCurrentStatePathDisabled = + Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED); } void preHandleMessage() throws Exception { @@ -137,8 +144,10 @@ public class HelixStateTransitionHandler extends MessageHandler { String sessionId = _message.getTgtSessionId(); String resource = _message.getResourceName(); ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize()); - PropertyKey key = accessor.keyBuilder().currentState(instance, sessionId, resource, - bucketizer.getBucketName(partitionName)); + PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder() + .taskCurrentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName)) + : accessor.keyBuilder() + .currentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName)); ZNRecord rec = new ZNRecord(resource); Map<String, String> map = new TreeMap<String, String>(); map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null); @@ -264,8 +273,10 @@ public class HelixStateTransitionHandler extends MessageHandler { try { // Update the ZK current state of the node - PropertyKey key = keyBuilder.currentState(instanceName, sessionId, resource, - bucketizer.getBucketName(partitionKey)); + PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder() + .taskCurrentState(instanceName, sessionId, resource, + bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder() + .currentState(instanceName, sessionId, resource, bucketizer.getBucketName(partitionKey)); if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) { // normal message if (!accessor.updateProperty(key, _currentStateDelta)) { @@ -438,9 +449,11 @@ public class HelixStateTransitionHandler extends MessageHandler { disablePartition(); } - if (!accessor.updateProperty( - keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName), - currentStateDelta)) { + PropertyKey currentStateKey = + _isTaskMessage && !_isTaskCurrentStatePathDisabled ? keyBuilder + .taskCurrentState(instanceName, _message.getTgtSessionId(), resourceName) + : keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName); + if (!accessor.updateProperty(currentStateKey, currentStateDelta)) { logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName + " partition: " + partition); } diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 54566cc..71ef39f 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -52,6 +52,7 @@ import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.controller.GenericHelixController; @@ -70,6 +71,7 @@ import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.task.TaskConstants; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.StatusUpdateUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -830,6 +832,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { String sessionId = manager.getSessionId(); List<String> curResourceNames = accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); + List<String> taskCurResourceNames = + accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId)); List<PropertyKey> createCurStateKeys = new ArrayList<>(); List<CurrentState> metaCurStates = new ArrayList<>(); Set<String> createCurStateNames = new HashSet<>(); @@ -908,10 +912,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (!message.isControlerMsg() && message.getMsgType() .equals(Message.MessageType.STATE_TRANSITION.name())) { String resourceName = message.getResourceName(); - if (!curResourceNames.contains(resourceName) && !createCurStateNames - .contains(resourceName)) { + if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName) + && !createCurStateNames.contains(resourceName)) { createCurStateNames.add(resourceName); - createCurStateKeys.add(keyBuilder.currentState(instanceName, sessionId, resourceName)); + PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName); + if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean + .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) { + curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName); + } + createCurStateKeys.add(curStateKey); CurrentState metaCurState = new CurrentState(resourceName); metaCurState.setBucketSize(message.getBucketSize()); diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index addadc8..68017c3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -22,6 +22,7 @@ package org.apache.helix.task; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.model.CurrentState; import org.apache.helix.task.TaskResult.Status; import org.slf4j.Logger; @@ -208,7 +209,10 @@ public class TaskRunner implements Runnable { String.format("Requesting a state transition to %s for partition %s.", state, partition)); try { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - PropertyKey key = keyBuilder.currentState(instance, sessionId, resource); + PropertyKey key = + Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED) ? keyBuilder + .currentState(instance, sessionId, resource) + : keyBuilder.taskCurrentState(instance, sessionId, resource); CurrentState currStateDelta = new CurrentState(resource); currStateDelta.setRequestedState(partition, state.name()); diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java index 5503067..e8f4f82 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java @@ -20,14 +20,17 @@ package org.apache.helix.integration.spectator; */ import java.lang.management.ManagementFactory; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import com.google.common.collect.ImmutableMap; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; @@ -35,12 +38,14 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyType; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.TestHelper; import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.MockTask; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; @@ -49,7 +54,16 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.monitoring.mbeans.MBeanRegistrar; import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor; +import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -77,9 +91,15 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); } + Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); + taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new); + for (int i = 0; i < NUM_NODES; i++) { String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME, + new TaskStateModelFactory(_participants[i], taskFactoryReg)); _participants[i].syncStart(); } @@ -117,6 +137,61 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { } @Test + public void testCurrentStatesRoutingTableIgnoreTaskCurrentStates() throws Exception { + FlaggedCurrentStateRoutingTableProvider routingTableCurrentStates = + new FlaggedCurrentStateRoutingTableProvider(_manager); + Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered()); + + try { + TaskDriver taskDriver = new TaskDriver(_manager); + String workflowName1 = TestHelper.getTestMethodName() + "_1"; + String jobName = "JOB0"; + + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(NUM_NODES) + .setNumConcurrentTasksPerInstance(1).setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000")); + + Workflow.Builder workflowBuilder1 = + new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder); + taskDriver.start(workflowBuilder1.build()); + taskDriver + .pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName), + TaskState.COMPLETED); + + Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered()); + + // Disable the task current path and the routing table provider should be notified + System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true"); + String workflowName2 = TestHelper.getTestMethodName() + "_2"; + Workflow.Builder workflowBuilder2 = + new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder); + taskDriver.start(workflowBuilder2.build()); + taskDriver + .pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName), + TaskState.COMPLETED); + + Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered()); + System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false"); + + String dbName = "testDB"; + _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, NUM_PARTITIONS, "MasterSlave", + IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName()); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, NUM_REPLICAS); + + ZkHelixClusterVerifier clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); + Assert.assertTrue(clusterVerifier.verifyByPolling()); + Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered()); + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName); + } finally { + routingTableCurrentStates.shutdown(); + } + } + + @Test (dependsOnMethods = "testCurrentStatesRoutingTableIgnoreTaskCurrentStates") public void testRoutingTableWithCurrentStates() throws Exception { RoutingTableProvider routingTableEV = new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW); @@ -365,4 +440,24 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase { super.onLiveInstanceChange(liveInstances, changeContext); } } + + static class FlaggedCurrentStateRoutingTableProvider extends RoutingTableProvider { + private boolean onStateChangeTriggered = false; + + public FlaggedCurrentStateRoutingTableProvider(HelixManager manager) { + super(manager, PropertyType.CURRENTSTATES); + } + + public boolean isOnStateChangeTriggered() { + return onStateChangeTriggered; + } + + @Override + @PreFetch(enabled = false) + public void onStateChange(String instanceName, List<CurrentState> statesInfo, + NotificationContext changeContext) { + onStateChangeTriggered = true; + super.onStateChange(instanceName, statesInfo, changeContext); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java index 8a19904..346dd7a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java @@ -52,11 +52,11 @@ public class TestCurrentStateDropWithoutConfigs extends TaskTestBase { currentState.setStartTime(taskName, System.currentTimeMillis()); currentState.setEndTime(taskName, System.currentTimeMillis()); _accessor.setProperty(_accessor.keyBuilder() - .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), + .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), jobName), currentState); Assert.assertTrue(TestHelper.verify(() -> _accessor.getProperty(_accessor.keyBuilder() - .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), + .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(), jobName)) == null, TestHelper.WAIT_DURATION * 10)); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java index c932634..f8f0cd1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java @@ -98,8 +98,8 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase { String instanceP2 = PARTICIPANT_PREFIX + "_" + (_startPort + 2); ZkClient clientP2 = (ZkClient) _participants[2].getZkClient(); String sessionIdP2 = ZkTestHelper.getSessionId(clientP2); - String currentStatePathP2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP2 + "/CURRENTSTATES/" - + sessionIdP2 + "/" + namespacedJobName; + String currentStatePathP2 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP2, sessionIdP2, namespacedJobName).toString(); Assert .assertTrue( @@ -113,14 +113,14 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase { String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); - String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/" - + sessionIdP0 + "/" + namespacedJobName; + String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString(); String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1); ZkClient clientP1 = (ZkClient) _participants[1].getZkClient(); String sessionIdP1 = ZkTestHelper.getSessionId(clientP1); - String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/" - + sessionIdP1 + "/" + namespacedJobName; + String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString(); ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(currentStatePathP2, new Stat(), AccessOption.PERSISTENT); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java index 627a7b9..8ca89e1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java @@ -88,10 +88,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase { String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); - String taskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName; - String dataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + sessionIdP0 + "/" + DATABASE; + String taskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString(); + String dataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .currentState(instanceP0, sessionIdP0, DATABASE).toString(); // Read the current states of Participant0 and make sure they been created boolean isCurrentStateCreated = TestHelper.verify(() -> { @@ -113,10 +113,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase { clientP0 = (ZkClient) _participants[0].getZkClient(); String newSessionIdP0 = ZkTestHelper.getSessionId(clientP0); - String newTaskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + newSessionIdP0 + "/" + namespacedJobName; - String newDataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + newSessionIdP0 + "/" + DATABASE; + String newTaskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, newSessionIdP0, namespacedJobName).toString(); + String newDataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .currentState(instanceP0, newSessionIdP0, DATABASE).toString(); boolean isCurrentStateExpected = TestHelper.verify(() -> { ZNRecord taskRecord = _manager.getHelixDataAccessor().getBaseDataAccessor() diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java index 720e7a3..1efb01e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java @@ -88,10 +88,10 @@ public class TestTaskCurrentStateNull extends TaskTestBase { String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); - String jobCurrentStatePath1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName1; - String jobCurrentStatePath2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 - + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName2; + String jobCurrentStatePath1 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1).toString(); + String jobCurrentStatePath2 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName2).toString(); // Read the current states of Participant0 and make sure they have been created boolean isCurrentStateCreated = TestHelper.verify(() -> { diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java new file mode 100644 index 0000000..8882a09 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java @@ -0,0 +1,100 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkTestHelper; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * This test makes sure that the Current State of the task are being removed after participant + * handles new session. + */ +public class TestTaskCurrentStatePathDisabled extends TaskTestBase { + private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB; + protected HelixDataAccessor _accessor; + + @BeforeClass + public void beforeClass() throws Exception { + _numPartitions = 1; + _numNodes = 1; + super.beforeClass(); + } + + @Test + public void testTaskCurrentStatePathDisabled() throws Exception { + String jobQueueName0 = TestHelper.getTestMethodName() + "_0"; + JobConfig.Builder jobBuilder0 = + new JobConfig.Builder().setWorkflow(jobQueueName0).setTargetResource(DATABASE) + .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000")); + JobQueue.Builder jobQueue0 = TaskTestUtil.buildJobQueue(jobQueueName0); + jobQueue0.enqueueJob("JOB0", jobBuilder0); + + _driver.start(jobQueue0.build()); + String namespacedJobName0 = TaskUtil.getNamespacedJobName(jobQueueName0, "JOB0"); + _driver.pollForJobState(jobQueueName0, namespacedJobName0, TaskState.IN_PROGRESS); + + // Get the current states of Participant0 + String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); + ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); + String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); + PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder(); + Assert.assertNotNull(_manager.getHelixDataAccessor() + .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName0))); + Assert.assertNull(_manager.getHelixDataAccessor() + .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName0))); + + // Test the case when the task current state path is disabled + String jobQueueName1 = TestHelper.getTestMethodName() + "_1"; + JobConfig.Builder jobBuilder1 = + new JobConfig.Builder().setWorkflow(jobQueueName1).setTargetResource(DATABASE) + .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name())) + .setCommand(MockTask.TASK_COMMAND) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000")); + JobQueue.Builder jobQueue1 = TaskTestUtil.buildJobQueue(jobQueueName1); + jobQueue1.enqueueJob("JOB1", jobBuilder1); + + System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true"); + _driver.start(jobQueue1.build()); + String namespacedJobName1 = TaskUtil.getNamespacedJobName(jobQueueName1, "JOB1"); + _driver.pollForJobState(jobQueueName1, namespacedJobName1, TaskState.IN_PROGRESS); + Assert.assertNull(_manager.getHelixDataAccessor() + .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1))); + Assert.assertNotNull(_manager.getHelixDataAccessor() + .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName1))); + System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false"); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java index bb970c7..ada5157 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java @@ -158,15 +158,15 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase { String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0); ZkClient clientP0 = (ZkClient) _participants[0].getZkClient(); String sessionIdP0 = ZkTestHelper.getSessionId(clientP0); - String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/" - + sessionIdP0 + "/" + namespacedJobName; + String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString(); // Get the current state of Participant1 String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1); ZkClient clientP1 = (ZkClient) _participants[1].getZkClient(); String sessionIdP1 = ZkTestHelper.getSessionId(clientP1); - String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/" - + sessionIdP1 + "/" + namespacedJobName; + String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder() + .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString(); boolean isCurrentStateCreated = TestHelper.verify(() -> { ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
