This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new bbb0554 Remove task requested state (#1723)
bbb0554 is described below
commit bbb0554f696a77f04f5f15adbd148b5f6bbe172f
Author: xyuanlu <[email protected]>
AuthorDate: Tue Jun 1 10:34:03 2021 -0700
Remove task requested state (#1723)
Remove requested state update in task framework
---
.../handling/HelixStateTransitionHandler.java | 150 ++++++++-------------
.../messaging/handling/HelixTaskExecutor.java | 1 -
.../helix/participant/statemachine/StateModel.java | 4 +-
.../java/org/apache/helix/task/TaskRunner.java | 52 ++++---
.../java/org/apache/helix/task/TaskStateModel.java | 2 +-
.../helix/integration/task/TestStopWorkflow.java | 2 +-
6 files changed, 91 insertions(+), 120 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 590064d..0d67ced 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -24,11 +24,8 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
@@ -123,9 +120,6 @@ public class HelixStateTransitionHandler extends
MessageHandler {
+ _message.getPartitionNames() + " from:" + _message.getFromState() +
" to:"
+ _message.getToState() + ", relayedFrom: " +
_message.getRelaySrcHost());
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- String partitionName = _message.getPartitionName();
-
// Set start time right before invoke client logic
_currentStateDelta.setStartTime(_message.getPartitionName(),
System.currentTimeMillis());
@@ -138,67 +132,21 @@ public class HelixStateTransitionHandler extends
MessageHandler {
throw err.exception;
}
- // Reset the REQUESTED_STATE property if it exists.
- try {
- String instance = _manager.getInstanceName();
- String sessionId = _message.getTgtSessionId();
- String resource = _message.getResourceName();
- ZNRecordBucketizer bucketizer = new
ZNRecordBucketizer(_message.getBucketSize());
- PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ?
accessor.keyBuilder()
- .taskCurrentState(instance, sessionId, resource,
bucketizer.getBucketName(partitionName))
- : accessor.keyBuilder()
- .currentState(instance, sessionId, resource,
bucketizer.getBucketName(partitionName));
- ZNRecord rec = new ZNRecord(resource);
- Map<String, String> map = new TreeMap<String, String>();
- map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
- rec.getMapFields().put(partitionName, map);
- ZNRecordDelta delta = new ZNRecordDelta(rec,
ZNRecordDelta.MergeOperation.SUBTRACT);
- List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
- deltaList.add(delta);
- CurrentState currStateUpdate = new CurrentState(resource);
- currStateUpdate.setDeltaList(deltaList);
-
- // Update the ZK current state of the node
- if (!accessor.updateProperty(key, currStateUpdate)) {
- logger.error("Fails to persist current state back to ZK for resource "
+ resource
- + " partition: " + partitionName);
- }
- } catch (Exception e) {
- logger.error("Error when removing " +
CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
- + " from current state.", e);
- StateTransitionError error =
- new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
- _stateModel.rollbackOnError(_message, _notificationContext, error);
- _statusUpdateUtil.logError(
- _message, HelixStateTransitionHandler.class, e, "Error when removing
"
- + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + "
from current state.",
- _manager);
- }
}
void postHandleMessage() {
HelixTaskResult taskResult =
(HelixTaskResult)
_notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
Exception exception = taskResult.getException();
-
String partitionKey = _message.getPartitionName();
- String resource = _message.getResourceName();
- String sessionId = _message.getTgtSessionId();
- String instanceName = _manager.getInstanceName();
-
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- int bucketSize = _message.getBucketSize();
- ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
// No need to sync on manager, we are cancel executor in expiry session
before start executor in
// new session
// sessionId might change when we update the state model state.
// for zk current state it is OK as we have the per-session current state
node
if (!_message.getTgtSessionId().equals(_manager.getSessionId())) {
- logger.warn("Session id has changed. Skip postExecutionMessage. Old
session "
- + _message.getExecutionSessionId() + " , new session : " +
_manager.getSessionId());
+ logger.warn("Session id has changed. Skip postExecutionMessage. Old
session " + _message
+ .getExecutionSessionId() + " , new session : " +
_manager.getSessionId());
return;
}
@@ -208,17 +156,14 @@ public class HelixStateTransitionHandler extends
MessageHandler {
_currentStateDelta.setPreviousState(partitionKey, _message.getFromState());
// add host name this state transition is triggered by.
- if
(Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType())) {
- _currentStateDelta.setTriggerHost(partitionKey,
_message.getRelaySrcHost());
- } else {
- _currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc());
- }
+ _currentStateDelta.setTriggerHost(partitionKey,
+
Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType()) ?
+ _message.getRelaySrcHost() : _message.getMsgSrc());
if (taskResult.isSuccess()) {
// String fromState = message.getFromState();
String toState = _message.getToState();
_currentStateDelta.setState(partitionKey, toState);
-
if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
// for "OnOfflineToDROPPED" message, we need to remove the resource
key record
// from the current state of the instance because the resource key is
dropped.
@@ -227,47 +172,64 @@ public class HelixStateTransitionHandler extends
MessageHandler {
rec.getMapFields().put(partitionKey, null);
ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
- List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+ List<ZNRecordDelta> deltaList = new ArrayList<>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
- _stateModelFactory.removeStateModel(resource, partitionKey);
- } else {
+ _stateModelFactory.removeStateModel(_message.getResourceName(),
partitionKey);
+ } else if
(_stateModel.getCurrentState().equals(_message.getFromState())) {
// if the partition is not to be dropped, update _stateModel to the
TO_STATE
+ // need this check because TaskRunner may change _stateModel before
reach here.
_stateModel.updateState(toState);
}
- } else if (taskResult.isCancelled()) {
- // Cancelled message does not need current state update
- return;
- } else {
+ } else if (!taskResult.isCancelled()) {
if (exception instanceof HelixStateMismatchException) {
// if fromState mismatch, set current state on zk to stateModel's
current state
logger.warn("Force CurrentState on Zk to be stateModel's CurrentState.
partitionKey: "
+ partitionKey + ", currentState: " +
_stateModel.getCurrentState() + ", message: "
+ _message);
_currentStateDelta.setState(partitionKey,
_stateModel.getCurrentState());
- } else {
- StateTransitionError error =
- new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR,
exception);
- if (exception instanceof InterruptedException) {
- if (_isTimeout) {
- error = new StateTransitionError(ErrorType.INTERNAL,
ErrorCode.TIMEOUT, exception);
- } else {
- // State transition interrupted but not caused by timeout. Keep
the current
- // state in this case
- logger.error(
- "State transition interrupted but not timeout. Not updating
state. Partition : "
- + _message.getPartitionName() + " MsgId : " +
_message.getMsgId());
- return;
- }
- }
- _stateModel.rollbackOnError(_message, _notificationContext, error);
- _currentStateDelta.setState(partitionKey,
HelixDefinedState.ERROR.toString());
- _stateModel.updateState(HelixDefinedState.ERROR.toString());
+ updateZKCurrentState();
+ return;
}
- }
+ // Handle timeout interrupt.
+ if ((exception instanceof InterruptedException) && !_isTimeout) {
+ // State transition interrupted but not caused by timeout. Keep the
current
+ // state in this case
+ logger.error(
+ "State transition interrupted but not timeout. Not updating state.
Partition : "
+ + _message.getPartitionName() + " MsgId : " +
_message.getMsgId());
+ return;
+ }
+ // We did early return when Timeout interrupt.
+ StateTransitionError error = new StateTransitionError(ErrorType.INTERNAL,
+ exception instanceof InterruptedException ? ErrorCode.TIMEOUT :
ErrorCode.ERROR,
+ exception);
+ _stateModel.rollbackOnError(_message, _notificationContext, error);
+ _currentStateDelta.setState(partitionKey,
HelixDefinedState.ERROR.toString());
+ _stateModel.updateState(HelixDefinedState.ERROR.toString());
+ } // NOP if taskResult.isCancelled()
+
+ updateZKCurrentState();
+ }
+
+ // Update the ZK current state of the node
+ private void updateZKCurrentState(){
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ String partitionKey = _message.getPartitionName();
+ String resource = _message.getResourceName();
+ String sessionId = _message.getTgtSessionId();
+ String instanceName = _manager.getInstanceName();
try {
- // Update the ZK current state of the node
+ // We did not update _stateModel for DROPPED state, so it won't match
_stateModel.
+ if
(!_stateModel.getCurrentState().equals(_currentStateDelta.getState(partitionKey))
+ && !_currentStateDelta.getState(partitionKey)
+ .equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+ logger.warn("_stateModel is already updated by TaskRunner. Skip ZK
update in StateTransitionHandler");
+ return;
+ }
+ int bucketSize = _message.getBucketSize();
+ ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ?
accessor.keyBuilder()
.taskCurrentState(instanceName, sessionId, resource,
bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder()
@@ -275,8 +237,9 @@ public class HelixStateTransitionHandler extends
MessageHandler {
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
// normal message
if (!accessor.updateProperty(key, _currentStateDelta)) {
- throw new HelixException("Fails to persist current state back to ZK
for resource "
- + resource + " partition: " + _message.getPartitionName());
+ throw new HelixException(
+ "Fails to persist current state back to ZK for resource " +
resource + " partition: "
+ + _message.getPartitionName());
}
} else {
// sub-message of a batch message
@@ -293,6 +256,8 @@ public class HelixStateTransitionHandler extends
MessageHandler {
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class,
e,
"Error when update current-state ", _manager);
}
+
+
}
@Override
@@ -347,12 +312,12 @@ public class HelixStateTransitionHandler extends
MessageHandler {
taskResult.setInterrupted(e instanceof InterruptedException);
}
}
-
taskResult.setCompleteTime(System.currentTimeMillis());
// add task result to context for postHandling
context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
- postHandleMessage();
-
+ synchronized (_stateModel) {
+ postHandleMessage();
+ }
return taskResult;
}
@@ -375,7 +340,6 @@ public class HelixStateTransitionHandler extends
MessageHandler {
"Instance %s, partition %s received state transition from %s to %s
on session %s, message id: %s",
message.getTgtName(), message.getPartitionName(),
message.getFromState(),
message.getToState(), message.getTgtSessionId(),
message.getMsgId()));
-
if (_cancelled) {
throw new HelixRollbackException(String.format(
"Instance %s, partition %s state transition from %s to %s on
session %s has been cancelled, message id: %s",
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 71ef39f..0e8aace 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -844,7 +844,6 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
// skip the following operations for the no-op messages.
continue;
}
-
NotificationContext msgWorkingContext = changeContext.clone();
MessageHandler msgHandler = null;
try {
diff --git
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 3a218b8..2c88a68 100644
---
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -39,8 +39,8 @@ public abstract class StateModel {
// @transition(from='from', to='to')
public void defaultTransitionHandler() {
- logger
- .error("Default default handler. The idea is to invoke this if no
transition method is found. Yet to be implemented");
+ logger.error(
+ "Default default handler. The idea is to invoke this if no transition
method is found. Yet to be implemented");
}
public boolean updateState(String newState) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 68017c3..794182d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -52,16 +52,18 @@ public class TaskRunner implements Runnable {
private volatile boolean _timeout = false;
// If true, indicates that the task has finished.
private volatile boolean _done = false;
+ private TaskStateModel _stateModel;
public TaskRunner(Task task, String taskName, String taskPartition, String
instance,
- HelixManager manager, String sessionId) {
+ HelixManager manager, String sessionId, TaskStateModel stateModel) {
_task = task;
_taskName = taskName;
_taskPartition = taskPartition;
_instance = instance;
_manager = manager;
_sessionId = sessionId;
+ _stateModel = stateModel;
}
@Override
@@ -79,22 +81,22 @@ public class TaskRunner implements Runnable {
switch (_result.getStatus()) {
case COMPLETED:
- requestStateTransition(TaskPartitionState.COMPLETED);
+ updateCurrentState(TaskPartitionState.COMPLETED);
break;
case CANCELED:
if (_timeout) {
- requestStateTransition(TaskPartitionState.TIMED_OUT);
+ updateCurrentState(TaskPartitionState.TIMED_OUT);
}
// Else the state transition to CANCELED was initiated by the
controller.
break;
case ERROR:
- requestStateTransition(TaskPartitionState.TASK_ERROR);
+ updateCurrentState(TaskPartitionState.TASK_ERROR);
break;
case FAILED:
- requestStateTransition(TaskPartitionState.TASK_ERROR);
+ updateCurrentState(TaskPartitionState.TASK_ERROR);
break;
case FATAL_FAILED:
- requestStateTransition(TaskPartitionState.TASK_ABORTED);
+ updateCurrentState(TaskPartitionState.TASK_ABORTED);
break;
default:
throw new AssertionError("Unknown task result type: " +
_result.getStatus().name());
@@ -103,7 +105,7 @@ public class TaskRunner implements Runnable {
LOG.error("Problem running the task, report task as FAILED.", e);
_result =
new TaskResult(Status.FAILED, "Exception happened in running task: "
+ e.getMessage());
- requestStateTransition(TaskPartitionState.TASK_ERROR);
+ updateCurrentState(TaskPartitionState.TASK_ERROR);
} finally {
synchronized (_doneSync) {
_done = true;
@@ -180,15 +182,15 @@ public class TaskRunner implements Runnable {
* Requests the controller for a state transition.
* @param state The state transition that is being requested.
*/
- private void requestStateTransition(TaskPartitionState state) {
- boolean success =
- setRequestedState(_manager.getHelixDataAccessor(), _instance,
_sessionId, _taskName,
- _taskPartition, state);
- if (!success) {
- LOG.error(String
- .format(
- "Failed to set the requested state to %s for instance %s,
session id %s, task partition %s.",
- state, _instance, _sessionId, _taskPartition));
+ private void updateCurrentState(TaskPartitionState state) {
+ synchronized (_stateModel) {
+ _stateModel.updateState(state.name());
+ if (!setZKCurrentState(_manager.getHelixDataAccessor(), _instance,
_sessionId, _taskName,
+ _taskPartition, state)) {
+ LOG.error(String.format(
+ "Failed to set the requested state to %s for instance %s, session
id %s, task partition %s.",
+ state, _instance, _sessionId, _taskPartition));
+ }
}
}
@@ -203,23 +205,29 @@ public class TaskRunner implements Runnable {
* @param state the requested state
* @return true if the request was persisted, false otherwise
*/
- private static boolean setRequestedState(HelixDataAccessor accessor, String
instance,
+ private boolean setZKCurrentState(HelixDataAccessor accessor, String
instance,
String sessionId, String resource, String partition, TaskPartitionState
state) {
LOG.debug(
- String.format("Requesting a state transition to %s for partition %s.",
state, partition));
+ String.format("Updating current state %s for partition %s.", state,
partition));
try {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
PropertyKey key =
Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED) ?
keyBuilder
.currentState(instance, sessionId, resource)
: keyBuilder.taskCurrentState(instance, sessionId, resource);
- CurrentState currStateDelta = new CurrentState(resource);
- currStateDelta.setRequestedState(partition, state.name());
- return accessor.updateProperty(key, currStateDelta);
+ String prevState = _stateModel.getCurrentState();
+ CurrentState currentStateDelta = new CurrentState(resource);
+ currentStateDelta.setSessionId(sessionId);
+ currentStateDelta.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME);
+ currentStateDelta.setState(partition, state.name());
+ currentStateDelta.setInfo(partition, _result.getInfo());
+ currentStateDelta.setPreviousState(partition, prevState);
+
+ return accessor.updateProperty(key, currentStateDelta);
} catch (Exception e) {
LOG.error(String
- .format("Error when requesting a state transition to %s for
partition %s.", state,
+ .format("Error when updating current state to %s for partition
%s.", state,
partition), e);
return false;
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 3771a57..fbac9dd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -327,7 +327,7 @@ public class TaskStateModel extends StateModel {
// Submit the task for execution
_taskRunner =
new TaskRunner(task, msg.getResourceName(), taskPartition,
msg.getTgtName(), _manager,
- msg.getTgtSessionId());
+ msg.getTgtSessionId(), this);
_taskExecutor.submit(_taskRunner);
_taskRunner.waitTillStarted();
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 08dc776..51b03bf 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -323,7 +323,7 @@ public class TestStopWorkflow extends TaskTestBase {
e.printStackTrace();
}
- return new TaskResult(TaskResult.Status.COMPLETED, "");
+ return new TaskResult(TaskResult.Status.CANCELED, "");
}
@Override