This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new bbb0554  Remove task requested state (#1723)
bbb0554 is described below

commit bbb0554f696a77f04f5f15adbd148b5f6bbe172f
Author: xyuanlu <[email protected]>
AuthorDate: Tue Jun 1 10:34:03 2021 -0700

    Remove task requested state (#1723)
    
    Remove requested state update in task framework
---
 .../handling/HelixStateTransitionHandler.java      | 150 ++++++++-------------
 .../messaging/handling/HelixTaskExecutor.java      |   1 -
 .../helix/participant/statemachine/StateModel.java |   4 +-
 .../java/org/apache/helix/task/TaskRunner.java     |  52 ++++---
 .../java/org/apache/helix/task/TaskStateModel.java |   2 +-
 .../helix/integration/task/TestStopWorkflow.java   |   2 +-
 6 files changed, 91 insertions(+), 120 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 590064d..0d67ced 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -24,11 +24,8 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
@@ -123,9 +120,6 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
         + _message.getPartitionNames() + " from:" + _message.getFromState() + 
" to:"
         + _message.getToState() + ", relayedFrom: " + 
_message.getRelaySrcHost());
 
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    String partitionName = _message.getPartitionName();
-
     // Set start time right before invoke client logic
     _currentStateDelta.setStartTime(_message.getPartitionName(), 
System.currentTimeMillis());
 
@@ -138,67 +132,21 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
       throw err.exception;
     }
 
-    // Reset the REQUESTED_STATE property if it exists.
-    try {
-      String instance = _manager.getInstanceName();
-      String sessionId = _message.getTgtSessionId();
-      String resource = _message.getResourceName();
-      ZNRecordBucketizer bucketizer = new 
ZNRecordBucketizer(_message.getBucketSize());
-      PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? 
accessor.keyBuilder()
-          .taskCurrentState(instance, sessionId, resource, 
bucketizer.getBucketName(partitionName))
-          : accessor.keyBuilder()
-              .currentState(instance, sessionId, resource, 
bucketizer.getBucketName(partitionName));
-      ZNRecord rec = new ZNRecord(resource);
-      Map<String, String> map = new TreeMap<String, String>();
-      map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
-      rec.getMapFields().put(partitionName, map);
-      ZNRecordDelta delta = new ZNRecordDelta(rec, 
ZNRecordDelta.MergeOperation.SUBTRACT);
-      List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
-      deltaList.add(delta);
-      CurrentState currStateUpdate = new CurrentState(resource);
-      currStateUpdate.setDeltaList(deltaList);
-
-      // Update the ZK current state of the node
-      if (!accessor.updateProperty(key, currStateUpdate)) {
-        logger.error("Fails to persist current state back to ZK for resource " 
+ resource
-            + " partition: " + partitionName);
-      }
-    } catch (Exception e) {
-      logger.error("Error when removing " + 
CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
-          + " from current state.", e);
-      StateTransitionError error =
-          new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
-      _stateModel.rollbackOnError(_message, _notificationContext, error);
-      _statusUpdateUtil.logError(
-          _message, HelixStateTransitionHandler.class, e, "Error when removing 
"
-              + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + " 
from current state.",
-          _manager);
-    }
   }
 
   void postHandleMessage() {
     HelixTaskResult taskResult =
         (HelixTaskResult) 
_notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
     Exception exception = taskResult.getException();
-
     String partitionKey = _message.getPartitionName();
-    String resource = _message.getResourceName();
-    String sessionId = _message.getTgtSessionId();
-    String instanceName = _manager.getInstanceName();
-
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    int bucketSize = _message.getBucketSize();
-    ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
 
     // No need to sync on manager, we are cancel executor in expiry session 
before start executor in
     // new session
     // sessionId might change when we update the state model state.
     // for zk current state it is OK as we have the per-session current state 
node
     if (!_message.getTgtSessionId().equals(_manager.getSessionId())) {
-      logger.warn("Session id has changed. Skip postExecutionMessage. Old 
session "
-          + _message.getExecutionSessionId() + " , new session : " + 
_manager.getSessionId());
+      logger.warn("Session id has changed. Skip postExecutionMessage. Old 
session " + _message
+          .getExecutionSessionId() + " , new session : " + 
_manager.getSessionId());
       return;
     }
 
@@ -208,17 +156,14 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
     _currentStateDelta.setPreviousState(partitionKey, _message.getFromState());
 
     // add host name this state transition is triggered by.
-    if 
(Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType())) {
-      _currentStateDelta.setTriggerHost(partitionKey, 
_message.getRelaySrcHost());
-    } else {
-      _currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc());
-    }
+    _currentStateDelta.setTriggerHost(partitionKey,
+        
Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType()) ?
+            _message.getRelaySrcHost() : _message.getMsgSrc());
 
     if (taskResult.isSuccess()) {
       // String fromState = message.getFromState();
       String toState = _message.getToState();
       _currentStateDelta.setState(partitionKey, toState);
-
       if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
         // for "OnOfflineToDROPPED" message, we need to remove the resource 
key record
         // from the current state of the instance because the resource key is 
dropped.
@@ -227,47 +172,64 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
         rec.getMapFields().put(partitionKey, null);
         ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
 
-        List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+        List<ZNRecordDelta> deltaList = new ArrayList<>();
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
-        _stateModelFactory.removeStateModel(resource, partitionKey);
-      } else {
+        _stateModelFactory.removeStateModel(_message.getResourceName(), 
partitionKey);
+      } else if 
(_stateModel.getCurrentState().equals(_message.getFromState())) {
         // if the partition is not to be dropped, update _stateModel to the 
TO_STATE
+        // need this check because TaskRunner may change _stateModel before 
reach here.
         _stateModel.updateState(toState);
       }
-    } else if (taskResult.isCancelled()) {
-      // Cancelled message does not need current state update
-      return;
-    } else {
+    } else if (!taskResult.isCancelled()) {
       if (exception instanceof HelixStateMismatchException) {
         // if fromState mismatch, set current state on zk to stateModel's 
current state
         logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. 
partitionKey: "
             + partitionKey + ", currentState: " + 
_stateModel.getCurrentState() + ", message: "
             + _message);
         _currentStateDelta.setState(partitionKey, 
_stateModel.getCurrentState());
-      } else {
-        StateTransitionError error =
-            new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, 
exception);
-        if (exception instanceof InterruptedException) {
-          if (_isTimeout) {
-            error = new StateTransitionError(ErrorType.INTERNAL, 
ErrorCode.TIMEOUT, exception);
-          } else {
-            // State transition interrupted but not caused by timeout. Keep 
the current
-            // state in this case
-            logger.error(
-                "State transition interrupted but not timeout. Not updating 
state. Partition : "
-                    + _message.getPartitionName() + " MsgId : " + 
_message.getMsgId());
-            return;
-          }
-        }
-        _stateModel.rollbackOnError(_message, _notificationContext, error);
-        _currentStateDelta.setState(partitionKey, 
HelixDefinedState.ERROR.toString());
-        _stateModel.updateState(HelixDefinedState.ERROR.toString());
+        updateZKCurrentState();
+        return;
       }
-    }
+      // Handle timeout interrupt.
+      if ((exception instanceof InterruptedException) && !_isTimeout) {
+        // State transition interrupted but not caused by timeout. Keep the 
current
+        // state in this case
+        logger.error(
+            "State transition interrupted but not timeout. Not updating state. 
Partition : "
+                + _message.getPartitionName() + " MsgId : " + 
_message.getMsgId());
+        return;
+      }
+      // We did early return when Timeout interrupt.
+      StateTransitionError error = new StateTransitionError(ErrorType.INTERNAL,
+          exception instanceof InterruptedException ? ErrorCode.TIMEOUT : 
ErrorCode.ERROR,
+          exception);
+      _stateModel.rollbackOnError(_message, _notificationContext, error);
+      _currentStateDelta.setState(partitionKey, 
HelixDefinedState.ERROR.toString());
+      _stateModel.updateState(HelixDefinedState.ERROR.toString());
+    } // NOP if taskResult.isCancelled()
+
+    updateZKCurrentState();
+  }
+
+  // Update the ZK current state of the node
+  private void updateZKCurrentState(){
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    String partitionKey = _message.getPartitionName();
+    String resource = _message.getResourceName();
+    String sessionId = _message.getTgtSessionId();
+    String instanceName = _manager.getInstanceName();
 
     try {
-      // Update the ZK current state of the node
+      // We did not update _stateModel for DROPPED state, so it won't match 
_stateModel.
+      if 
(!_stateModel.getCurrentState().equals(_currentStateDelta.getState(partitionKey))
+          && !_currentStateDelta.getState(partitionKey)
+          .equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+        logger.warn("_stateModel is already updated by TaskRunner. Skip ZK 
update in StateTransitionHandler");
+        return;
+      }
+      int bucketSize = _message.getBucketSize();
+      ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
       PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? 
accessor.keyBuilder()
           .taskCurrentState(instanceName, sessionId, resource,
               bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder()
@@ -275,8 +237,9 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
         // normal message
         if (!accessor.updateProperty(key, _currentStateDelta)) {
-          throw new HelixException("Fails to persist current state back to ZK 
for resource "
-              + resource + " partition: " + _message.getPartitionName());
+          throw new HelixException(
+              "Fails to persist current state back to ZK for resource " + 
resource + " partition: "
+                  + _message.getPartitionName());
         }
       } else {
         // sub-message of a batch message
@@ -293,6 +256,8 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, 
e,
           "Error when update current-state ", _manager);
     }
+
+
   }
 
   @Override
@@ -347,12 +312,12 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
         taskResult.setInterrupted(e instanceof InterruptedException);
       }
     }
-
     taskResult.setCompleteTime(System.currentTimeMillis());
     // add task result to context for postHandling
     context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
-    postHandleMessage();
-
+    synchronized (_stateModel) {
+      postHandleMessage();
+    }
     return taskResult;
   }
 
@@ -375,7 +340,6 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
           "Instance %s, partition %s received state transition from %s to %s 
on session %s, message id: %s",
           message.getTgtName(), message.getPartitionName(), 
message.getFromState(),
           message.getToState(), message.getTgtSessionId(), 
message.getMsgId()));
-
       if (_cancelled) {
         throw new HelixRollbackException(String.format(
             "Instance %s, partition %s state transition from %s to %s on 
session %s has been cancelled, message id: %s",
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 71ef39f..0e8aace 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -844,7 +844,6 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         // skip the following operations for the no-op messages.
         continue;
       }
-
       NotificationContext msgWorkingContext = changeContext.clone();
       MessageHandler msgHandler = null;
       try {
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 3a218b8..2c88a68 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -39,8 +39,8 @@ public abstract class StateModel {
 
   // @transition(from='from', to='to')
   public void defaultTransitionHandler() {
-    logger
-        .error("Default default handler. The idea is to invoke this if no 
transition method is found. Yet to be implemented");
+    logger.error(
+        "Default default handler. The idea is to invoke this if no transition 
method is found. Yet to be implemented");
   }
 
   public boolean updateState(String newState) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 68017c3..794182d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -52,16 +52,18 @@ public class TaskRunner implements Runnable {
   private volatile boolean _timeout = false;
   // If true, indicates that the task has finished.
   private volatile boolean _done = false;
+  private TaskStateModel _stateModel;
 
 
   public TaskRunner(Task task, String taskName, String taskPartition, String 
instance,
-      HelixManager manager, String sessionId) {
+      HelixManager manager, String sessionId, TaskStateModel stateModel) {
     _task = task;
     _taskName = taskName;
     _taskPartition = taskPartition;
     _instance = instance;
     _manager = manager;
     _sessionId = sessionId;
+    _stateModel = stateModel;
   }
 
   @Override
@@ -79,22 +81,22 @@ public class TaskRunner implements Runnable {
 
       switch (_result.getStatus()) {
       case COMPLETED:
-        requestStateTransition(TaskPartitionState.COMPLETED);
+        updateCurrentState(TaskPartitionState.COMPLETED);
         break;
       case CANCELED:
         if (_timeout) {
-          requestStateTransition(TaskPartitionState.TIMED_OUT);
+          updateCurrentState(TaskPartitionState.TIMED_OUT);
         }
         // Else the state transition to CANCELED was initiated by the 
controller.
         break;
       case ERROR:
-        requestStateTransition(TaskPartitionState.TASK_ERROR);
+        updateCurrentState(TaskPartitionState.TASK_ERROR);
         break;
       case FAILED:
-        requestStateTransition(TaskPartitionState.TASK_ERROR);
+        updateCurrentState(TaskPartitionState.TASK_ERROR);
         break;
       case FATAL_FAILED:
-        requestStateTransition(TaskPartitionState.TASK_ABORTED);
+        updateCurrentState(TaskPartitionState.TASK_ABORTED);
         break;
       default:
         throw new AssertionError("Unknown task result type: " + 
_result.getStatus().name());
@@ -103,7 +105,7 @@ public class TaskRunner implements Runnable {
       LOG.error("Problem running the task, report task as FAILED.", e);
       _result =
           new TaskResult(Status.FAILED, "Exception happened in running task: " 
+ e.getMessage());
-      requestStateTransition(TaskPartitionState.TASK_ERROR);
+      updateCurrentState(TaskPartitionState.TASK_ERROR);
     } finally {
       synchronized (_doneSync) {
         _done = true;
@@ -180,15 +182,15 @@ public class TaskRunner implements Runnable {
    * Requests the controller for a state transition.
    * @param state The state transition that is being requested.
    */
-  private void requestStateTransition(TaskPartitionState state) {
-    boolean success =
-        setRequestedState(_manager.getHelixDataAccessor(), _instance, 
_sessionId, _taskName,
-            _taskPartition, state);
-    if (!success) {
-      LOG.error(String
-          .format(
-              "Failed to set the requested state to %s for instance %s, 
session id %s, task partition %s.",
-              state, _instance, _sessionId, _taskPartition));
+  private void updateCurrentState(TaskPartitionState state) {
+    synchronized (_stateModel) {
+      _stateModel.updateState(state.name());
+      if (!setZKCurrentState(_manager.getHelixDataAccessor(), _instance, 
_sessionId, _taskName,
+          _taskPartition, state)) {
+        LOG.error(String.format(
+            "Failed to set the requested state to %s for instance %s, session 
id %s, task partition %s.",
+            state, _instance, _sessionId, _taskPartition));
+      }
     }
   }
 
@@ -203,23 +205,29 @@ public class TaskRunner implements Runnable {
    * @param state     the requested state
    * @return true if the request was persisted, false otherwise
    */
-  private static boolean setRequestedState(HelixDataAccessor accessor, String 
instance,
+  private boolean setZKCurrentState(HelixDataAccessor accessor, String 
instance,
       String sessionId, String resource, String partition, TaskPartitionState 
state) {
     LOG.debug(
-        String.format("Requesting a state transition to %s for partition %s.", 
state, partition));
+        String.format("Updating current state %s for partition %s.", state, 
partition));
     try {
       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
       PropertyKey key =
           
Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED) ? 
keyBuilder
               .currentState(instance, sessionId, resource)
               : keyBuilder.taskCurrentState(instance, sessionId, resource);
-      CurrentState currStateDelta = new CurrentState(resource);
-      currStateDelta.setRequestedState(partition, state.name());
 
-      return accessor.updateProperty(key, currStateDelta);
+      String prevState = _stateModel.getCurrentState();
+      CurrentState currentStateDelta = new CurrentState(resource);
+      currentStateDelta.setSessionId(sessionId);
+      currentStateDelta.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME);
+      currentStateDelta.setState(partition, state.name());
+      currentStateDelta.setInfo(partition, _result.getInfo());
+      currentStateDelta.setPreviousState(partition, prevState);
+
+      return accessor.updateProperty(key, currentStateDelta);
     } catch (Exception e) {
       LOG.error(String
-          .format("Error when requesting a state transition to %s for 
partition %s.", state,
+          .format("Error when updating current state  to %s for partition 
%s.", state,
               partition), e);
       return false;
     }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 3771a57..fbac9dd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -327,7 +327,7 @@ public class TaskStateModel extends StateModel {
     // Submit the task for execution
     _taskRunner =
         new TaskRunner(task, msg.getResourceName(), taskPartition, 
msg.getTgtName(), _manager,
-            msg.getTgtSessionId());
+            msg.getTgtSessionId(), this);
     _taskExecutor.submit(_taskRunner);
     _taskRunner.waitTillStarted();
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 08dc776..51b03bf 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -323,7 +323,7 @@ public class TestStopWorkflow extends TaskTestBase {
         e.printStackTrace();
       }
 
-      return new TaskResult(TaskResult.Status.COMPLETED, "");
+      return new TaskResult(TaskResult.Status.CANCELED, "");
     }
 
     @Override

Reply via email to