[HELIX-669] State Transition Cancellation Client side change Part II

In Helix, there are many different scenrios which could make some pending state 
transitions not valid any more, for example, a resource is deleted while it 
still has some pending transitions, or Helix calculates a new ideal mapping 
while there are still some pending transitions not matching new mapping.  In 
such cases, Helix controller should proactively cancell these pending 
transitions instead of waiting them to finish.

In this rb:
1. Support MessageHandlerFactory can be registered by different message type.
2. Refactor related API
3. Add unit test for multi message type registeration.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b9de8362
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b9de8362
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b9de8362

Branch: refs/heads/master
Commit: b9de8362bc75c98aa49110c78ce17edf9ef52456
Parents: aa2f641
Author: Junkai Xue <j...@linkedin.com>
Authored: Tue Oct 3 12:13:50 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Tue Oct 3 12:24:48 2017 -0700

----------------------------------------------------------------------
 .../resources/SchedulerTasksResource.java       |   4 +-
 .../apache/helix/ClusterMessagingService.java   |  19 ++-
 .../controller/rebalancer/AutoRebalancer.java   |  10 +-
 .../stages/ExternalViewComputeStage.java        |   2 +-
 .../apache/helix/examples/BootstrapProcess.java |  13 +-
 .../manager/zk/ControllerManagerHelper.java     |  19 ++-
 .../DefaultControllerMessageHandlerFactory.java |  13 +-
 ...ltParticipantErrorMessageHandlerFactory.java |  10 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |  23 ++--
 .../helix/manager/zk/ParticipantManager.java    |   2 +-
 .../messaging/DefaultMessagingService.java      |  20 ++-
 .../handling/AsyncCallbackService.java          |  12 +-
 .../handling/HelixBatchMessageTask.java         |   6 +
 ...HelixStateTransitionCancellationHandler.java |  21 ++-
 .../handling/HelixStateTransitionHandler.java   |   9 ++
 .../helix/messaging/handling/HelixTask.java     |  23 +++-
 .../messaging/handling/HelixTaskExecutor.java   |  38 +++---
 .../messaging/handling/MessageHandler.java      |   6 +
 .../handling/MessageHandlerFactory.java         |   5 +
 .../helix/messaging/handling/MessageTask.java   |   1 +
 .../messaging/handling/MessageTaskInfo.java     |   4 +
 .../java/org/apache/helix/model/Message.java    |   2 +-
 .../participant/HelixStateMachineEngine.java    |  11 +-
 .../participant/statemachine/StateModel.java    |  10 +-
 .../src/test/java/org/apache/helix/Mocks.java   |   5 +
 .../org/apache/helix/TestHelixTaskExecutor.java |   2 +-
 .../helix/integration/TestMessagingService.java |  32 +++--
 .../helix/integration/TestSchedulerMessage.java |  43 +++---
 .../integration/TestSchedulerMessage2.java      |   4 +-
 .../integration/TestSchedulerMsgContraints.java |   6 +-
 .../integration/TestSchedulerMsgUsingQueue.java |   2 +-
 .../helix/integration/TestZkSessionExpiry.java  |   9 +-
 .../messaging/TestDefaultMessagingService.java  |  57 ++++++++
 .../handling/TestConfigThreadpoolSize.java      |  11 ++
 .../handling/TestHelixTaskExecutor.java         | 130 +++++++++++++------
 .../mock/participant/MockDelayMSStateModel.java |  24 +++-
 36 files changed, 449 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git 
a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
 
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 76cb7a6..1602c3a 100644
--- 
a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ 
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -149,9 +149,9 @@ public class SchedulerTasksResource extends ServerResource {
 
       Map<String, String> resultMap = new HashMap<String, String>();
       resultMap.put("StatusUpdatePath", PropertyPathBuilder.getPath(
-          PropertyType.STATUSUPDATES_CONTROLLER, clusterName, 
MessageType.SCHEDULER_MSG.toString(),
+          PropertyType.STATUSUPDATES_CONTROLLER, clusterName, 
MessageType.SCHEDULER_MSG.name(),
           schedulerMessage.getMsgId()));
-      resultMap.put("MessageType", 
Message.MessageType.SCHEDULER_MSG.toString());
+      resultMap.put("MessageType", Message.MessageType.SCHEDULER_MSG.name());
       resultMap.put("MsgId", schedulerMessage.getMsgId());
 
       // Assemble the rest URL for task status update

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java 
b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
index 6ff76e9..96a5957 100644
--- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -121,11 +121,26 @@ public interface ClusterMessagingService {
    *          The message type that the factory will create handler for
    * @param factory
    *          The per-type message factory
-   * @param threadpoolSize
-   *          size of the execution threadpool that handles the message
    */
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory 
factory);
 
+
+  /**
+   * This will register a message handler factory to create handlers for
+   * message. In case client code defines its own message type, it can define a
+   * message handler factory to create handlers to process those messages.
+   * Messages are processed in a threadpool which is hosted by cluster manager,
+   * and cluster manager will call the factory to create handler, and the
+   * handler is called in the threadpool.
+   * Note that only one message handler factory can be registered with one
+   * message type.
+   * @param types
+   *          The different message types that the factory will create handler 
for
+   * @param factory
+   *          The per-type message factory
+   */
+  public void registerMessageHandlerFactory(List<String> types, 
MessageHandlerFactory factory);
+
   /**
    * This will generate all messages to be sent given the recipientCriteria 
and MessageTemplate,
    * the messages are not sent.

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 8096d5a..9cfcc81 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -27,17 +27,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -53,9 +49,9 @@ import org.apache.log4j.Logger;
 public class AutoRebalancer extends AbstractRebalancer {
   private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
 
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+  @Override public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
     List<String> partitions = new 
ArrayList<String>(currentIdealState.getPartitionSet());
     String stateModelName = currentIdealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = 
clusterData.getStateModelDef(stateModelName);

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 5eaf08a..db69c17 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -241,7 +241,7 @@ public class ExternalViewComputeStage extends 
AbstractBaseStage {
     if (controllerMsgUpdates.size() > 0) {
       for (String controllerMsgId : controllerMsgUpdates.keySet()) {
         PropertyKey controllerStatusUpdateKey =
-            
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), 
controllerMsgId);
+            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
controllerMsgId);
         StatusUpdate controllerStatusUpdate = 
accessor.getProperty(controllerStatusUpdateKey);
         for (String taskPartitionName : 
controllerMsgUpdates.get(controllerMsgId).keySet()) {
           Map<String, String> result = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java 
b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
index ce4cf10..04a6c35 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -21,6 +21,7 @@ package org.apache.helix.examples;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -45,6 +46,8 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * This process does little more than handling the state transition messages.
  * This is generally the case when the server needs to bootstrap when it comes
@@ -109,7 +112,7 @@ public class BootstrapProcess {
     manager.getMessagingService().registerMessageHandlerFactory(
         MessageType.STATE_TRANSITION.name(), stateMach);
     manager.getMessagingService().registerMessageHandlerFactory(
-        MessageType.USER_DEFINE_MSG.toString(), new 
CustomMessageHandlerFactory());
+        MessageType.USER_DEFINE_MSG.name(), new CustomMessageHandlerFactory());
     manager.connect();
   }
 
@@ -121,9 +124,15 @@ public class BootstrapProcess {
       return new CustomMessageHandler(message, context);
     }
 
+    @Deprecated
     @Override
     public String getMessageType() {
-      return MessageType.USER_DEFINE_MSG.toString();
+      return MessageType.USER_DEFINE_MSG.name();
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of(MessageType.USER_DEFINE_MSG.name());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 9a817e3..6c43193 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -54,20 +54,25 @@ public class ControllerManagerHelper {
       _manager.addControllerMessageListener(_messagingService.getExecutor());
       MessageHandlerFactory defaultControllerMsgHandlerFactory =
           new DefaultControllerMessageHandlerFactory();
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultControllerMsgHandlerFactory.getMessageType(), 
defaultControllerMsgHandlerFactory);
+      for (String type : defaultControllerMsgHandlerFactory.getMessageTypes()) 
{
+        _messagingService.getExecutor()
+            .registerMessageHandlerFactory(type, 
defaultControllerMsgHandlerFactory);
+      }
 
       MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
           new DefaultSchedulerMessageHandlerFactory(_manager);
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultSchedulerMsgHandlerFactory.getMessageType(), 
defaultSchedulerMsgHandlerFactory);
+      for (String type : defaultSchedulerMsgHandlerFactory.getMessageTypes()) {
+        _messagingService.getExecutor()
+            .registerMessageHandlerFactory(type, 
defaultSchedulerMsgHandlerFactory);
+      }
 
       MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
           new DefaultParticipantErrorMessageHandlerFactory(_manager);
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultParticipantErrorMessageHandlerFactory.getMessageType(),
 
-          defaultParticipantErrorMessageHandlerFactory);
+      for (String type : 
defaultParticipantErrorMessageHandlerFactory.getMessageTypes()) {
+        _messagingService.getExecutor()
+            .registerMessageHandlerFactory(type, 
defaultParticipantErrorMessageHandlerFactory);
+      }
 
       /**
        * setup generic-controller

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
index 5f6d083..6a01ef2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.util.List;
+
 import org.apache.helix.HelixException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -28,6 +30,8 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+
 public class DefaultControllerMessageHandlerFactory implements 
MessageHandlerFactory {
   private static Logger _logger = 
Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
 
@@ -45,7 +49,12 @@ public class DefaultControllerMessageHandlerFactory 
implements MessageHandlerFac
 
   @Override
   public String getMessageType() {
-    return MessageType.CONTROLLER_MSG.toString();
+    return MessageType.CONTROLLER_MSG.name();
+  }
+
+  @Override
+  public List<String> getMessageTypes() {
+    return ImmutableList.of(MessageType.CONTROLLER_MSG.name());
   }
 
   @Override
@@ -62,7 +71,7 @@ public class DefaultControllerMessageHandlerFactory 
implements MessageHandlerFac
     public HelixTaskResult handleMessage() throws InterruptedException {
       String type = _message.getMsgType();
       HelixTaskResult result = new HelixTaskResult();
-      if (!type.equals(MessageType.CONTROLLER_MSG.toString())) {
+      if (!type.equals(MessageType.CONTROLLER_MSG.name())) {
         throw new HelixException("Unexpected msg type for message " + 
_message.getMsgId()
             + " type:" + _message.getMsgType());
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
index d2e56eb..53b2be4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -20,6 +20,7 @@ package org.apache.helix.manager.zk;
  */
 
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -30,6 +31,8 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * DefaultParticipantErrorMessageHandlerFactory works on controller side.
  * When the participant detects a critical error, it will send the 
PARTICIPANT_ERROR_REPORT
@@ -118,7 +121,12 @@ public class DefaultParticipantErrorMessageHandlerFactory 
implements MessageHand
 
   @Override
   public String getMessageType() {
-    return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
+    return Message.MessageType.PARTICIPANT_ERROR_REPORT.name();
+  }
+
+  @Override
+  public List<String> getMessageTypes() {
+    return 
ImmutableList.of(Message.MessageType.PARTICIPANT_ERROR_REPORT.name());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 8e4071c..827ed73 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -48,6 +48,8 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import com.google.common.collect.ImmutableList;
+
 /*
  * The current implementation supports throttling on STATE-TRANSITION type of 
message, transition SCHEDULED-COMPLETED.
  *
@@ -107,12 +109,12 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MessageHandlerFact
       Builder keyBuilder = accessor.keyBuilder();
       ZNRecord statusUpdate =
           accessor.getProperty(
-              
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                   originalMessage.getMsgId())).getRecord();
 
       statusUpdate.getMapFields().putAll(_resultSummaryMap);
       accessor.setProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
               originalMessage.getMsgId()), new StatusUpdate(statusUpdate));
 
     }
@@ -139,7 +141,12 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MessageHandlerFact
 
   @Override
   public String getMessageType() {
-    return MessageType.SCHEDULER_MSG.toString();
+    return MessageType.SCHEDULER_MSG.name();
+  }
+
+  @Override
+  public List<String> getMessageTypes() {
+    return ImmutableList.of(MessageType.SCHEDULER_MSG.name());
   }
 
   @Override
@@ -219,11 +226,11 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MessageHandlerFact
 
       ZNRecord statusUpdate =
           accessor.getProperty(
-              
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                   _message.getMsgId())).getRecord();
 
       statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
-      
accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+      
accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
           _message.getMsgId()), new StatusUpdate(statusUpdate));
     }
 
@@ -247,7 +254,7 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MessageHandlerFact
     public HelixTaskResult handleMessage() throws InterruptedException {
       String type = _message.getMsgType();
       HelixTaskResult result = new HelixTaskResult();
-      if (!type.equals(MessageType.SCHEDULER_MSG.toString())) {
+      if (!type.equals(MessageType.SCHEDULER_MSG.name())) {
         throw new HelixException("Unexpected msg type for message " + 
_message.getMsgId()
             + " type:" + _message.getMsgType());
       }
@@ -324,12 +331,12 @@ public class DefaultSchedulerMessageHandlerFactory 
implements MessageHandlerFact
 
       ZNRecord statusUpdate =
           accessor.getProperty(
-              
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                   _message.getMsgId())).getRecord();
 
       statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
 
-      
accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+      
accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
           _message.getMsgId()), new StatusUpdate(statusUpdate));
 
       result.getTaskResultMap().put("ControllerResult",

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 209dbca..9078cd1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -331,7 +331,7 @@ public class ParticipantManager {
   }
 
   private void setupMsgHandler() throws Exception {
-    
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(),
+    
_messagingService.registerMessageHandlerFactory(_stateMachineEngine.getMessageTypes(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), 
_instanceName);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 42764f3..a1de26a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -20,6 +20,7 @@ package org.apache.helix.messaging;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +69,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
 
     _taskExecutor = new HelixTaskExecutor(_participantStatusMonitor);
     _asyncCallbackService = new AsyncCallbackService();
-    
_taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
+    _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
         _asyncCallbackService);
   }
 
@@ -207,11 +208,22 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
   }
 
   @Override
-  public synchronized void registerMessageHandlerFactory(String type, 
MessageHandlerFactory factory) {
+  public synchronized void registerMessageHandlerFactory(String type,
+      MessageHandlerFactory factory) {
+    registerMessageHandlerFactory(Collections.singletonList(type), factory);
+  }
+
+  @Override
+  public synchronized void registerMessageHandlerFactory(List<String> types,
+      MessageHandlerFactory factory) {
     if (_manager.isConnected()) {
-      registerMessageHandlerFactoryInternal(type, factory);
+      for (String type : types) {
+        registerMessageHandlerFactoryInternal(type, factory);
+      }
     } else {
-      _messageHandlerFactoriestobeAdded.put(type, factory);
+      for (String type : types) {
+        _messageHandlerFactoriestobeAdded.put(type, factory);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
index 46c595d..fec18ee 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -19,6 +19,7 @@ package org.apache.helix.messaging.handling;
  * under the License.
  */
 
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixException;
@@ -28,6 +29,8 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+
 public class AsyncCallbackService implements MessageHandlerFactory {
   private final ConcurrentHashMap<String, AsyncCallback> _callbackMap =
       new ConcurrentHashMap<String, AsyncCallback>();
@@ -45,7 +48,7 @@ public class AsyncCallbackService implements 
MessageHandlerFactory {
   }
 
   void verifyMessage(Message message) {
-    if 
(!message.getMsgType().toString().equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
 {
+    if 
(!message.getMsgType().toString().equalsIgnoreCase(MessageType.TASK_REPLY.name()))
 {
       String errorMsg =
           "Unexpected msg type for message " + message.getMsgId() + " type:" + 
message.getMsgType()
               + " Expected : " + MessageType.TASK_REPLY;
@@ -79,7 +82,12 @@ public class AsyncCallbackService implements 
MessageHandlerFactory {
 
   @Override
   public String getMessageType() {
-    return MessageType.TASK_REPLY.toString();
+    return MessageType.TASK_REPLY.name();
+  }
+
+  @Override
+  public List<String> getMessageTypes() {
+    return ImmutableList.of(MessageType.TASK_REPLY.name());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
index f707db1..0999be8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
@@ -114,4 +114,10 @@ public class HelixBatchMessageTask implements MessageTask {
       }
     }
   }
+
+  @Override
+  public boolean cancel() {
+    // TODO: implement this method if need for batch message cancel
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
index 3256d8d..0abfe86 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
@@ -40,20 +40,15 @@ public class HelixStateTransitionCancellationHandler 
extends MessageHandler {
   @Override
   public HelixTaskResult handleMessage() throws InterruptedException {
     HelixTaskResult taskResult = new HelixTaskResult();
-    synchronized (_stateModel) {
-      try {
-        _stateModel.cancel();
-        taskResult.setSuccess(true);
-      } catch (Exception e) {
-        taskResult.setSuccess(false);
-        taskResult.setMessage(e.toString());
-        taskResult.setException(e);
-      }
-
-      if (taskResult.isSuccess()) {
-        throw new HelixRollbackException("Cancellation success for " + 
_message.getMsgId());
-      }
+    try {
+      _stateModel.cancel();
+      taskResult.setSuccess(true);
+    } catch (Exception e) {
+      taskResult.setSuccess(false);
+      taskResult.setMessage(e.toString());
+      taskResult.setException(e);
     }
+
     return taskResult;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index e9c4f48..30d321f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -28,11 +28,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
@@ -341,6 +343,13 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
                                 message.getToState(),
                                 message.getTgtSessionId()));
 
+      if (_cancelled) {
+        throw new HelixRollbackException(String.format(
+            "Instance %s, partition %s state transition from %s to %s on 
session %s has been cancelled",
+            message.getTgtName(), message.getPartitionName(), 
message.getFromState(),
+            message.getToState(), message.getTgtSessionId()));
+      }
+
       Object result = methodToInvoke.invoke(_stateModel, new Object[] { 
message, context });
       taskResult.setSuccess(true);
       String resultStr;

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 12193f0..05e9b89 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -49,6 +49,8 @@ public class HelixTask implements MessageTask {
   StatusUpdateUtil _statusUpdateUtil;
   HelixTaskExecutor _executor;
   volatile boolean _isTimeout = false;
+  volatile boolean _isStarted = false;
+  volatile boolean _isCancelled = false;
 
   public HelixTask(Message message, NotificationContext notificationContext,
       MessageHandler handler, HelixTaskExecutor executor) {
@@ -83,6 +85,7 @@ public class HelixTask implements MessageTask {
 
     // Handle the message
     try {
+      setStarted();
       taskResult = _handler.handleMessage();
     } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
@@ -147,6 +150,7 @@ public class HelixTask implements MessageTask {
           }
         } else if (taskResult.isCancelled()) {
           // Cancellation success, report message complete
+          type = null;
           _statusUpdateUtil
               .logInfo(_message, _handler.getClass(), "Cancellation completed 
successfully",
                   accessor);
@@ -210,7 +214,7 @@ public class HelixTask implements MessageTask {
 
   private void sendReply(HelixDataAccessor accessor, Message message, 
HelixTaskResult taskResult) {
     if (_message.getCorrelationId() != null
-        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString())) {
+        && !message.getMsgType().equals(MessageType.TASK_REPLY.name())) {
       logger.info("Sending reply for message " + message.getCorrelationId());
       _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", 
accessor);
 
@@ -286,4 +290,21 @@ public class HelixTask implements MessageTask {
     _isTimeout = true;
     _handler.onTimeout();
   }
+
+  @Override
+  public synchronized boolean cancel() {
+    if (!_isStarted) {
+      _isCancelled = true;
+      _handler.cancel();
+      return true;
+    }
+    return false;
+  }
+
+  private synchronized void setStarted() {
+    if (_isCancelled) {
+      throw new HelixRollbackException("Task has already been cancelled");
+    }
+    _isStarted = true;
+  }
 };

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 41b5d0d..936dc56 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -123,7 +123,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
 
-  final ConcurrentHashMap<String, Future<HelixTaskResult>> _messageFutureMap;
+  final ConcurrentHashMap<String, String> _messageTaskMap;
   private ExecutorService _cancellationExcutorService;
 
   /**
@@ -149,7 +149,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, 
MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
-    _messageFutureMap = new ConcurrentHashMap<String, 
Future<HelixTaskResult>>();
+    _messageTaskMap = new ConcurrentHashMap<String, String>();
     _cancellationExcutorService = 
Executors.newFixedThreadPool(DEFAULT_CANCELLATION_THREADPOOL_SIZE);
     _batchMessageExecutorService = 
Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
     _batchMessageThreadpoolChecked = false;
@@ -173,9 +173,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory 
factory,
       int threadpoolSize) {
-    if (!type.equalsIgnoreCase(factory.getMessageType())) {
+    if (!factory.getMessageTypes().contains(type)) {
       throw new HelixException("Message factory type mismatch. Type: " + type 
+ ", factory: "
-          + factory.getMessageType());
+          + factory.getMessageTypes());
     }
 
     MsgHandlerFactoryRegistryItem newItem =
@@ -342,7 +342,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       String taskId = task.getTaskId();
       if (_taskMap.containsKey(taskId)) {
         MessageTaskInfo info = _taskMap.get(taskId);
-        removeMessageFromFutureMap(task.getMessage());
+        removeMessageFromTaskAndFutureMap(task.getMessage());
         if (info._timerTask != null) {
           info._timerTask.cancel();
         }
@@ -379,9 +379,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
           LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
           Future<HelixTaskResult> future = exeSvc.submit(task);
-          _messageFutureMap
+          _messageTaskMap
               .putIfAbsent(getMessageTarget(message.getResourceName(), 
message.getPartitionName()),
-                  future);
+                  taskId);
 
           TimerTask timerTask = null;
           if (message.getExecutionTimeout() > 0) {
@@ -429,7 +429,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
         // cancel task
         Future<HelixTaskResult> future = taskInfo.getFuture();
-        removeMessageFromFutureMap(message);
+        removeMessageFromTaskAndFutureMap(message);
         _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling 
task: " + taskId,
             notificationContext.getManager().getHelixDataAccessor());
 
@@ -463,7 +463,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     synchronized (_lock) {
       if (_taskMap.containsKey(taskId)) {
         MessageTaskInfo info = _taskMap.remove(taskId);
-        removeMessageFromFutureMap(message);
+        removeMessageFromTaskAndFutureMap(message);
         if (info._timerTask != null) {
           // ok to cancel multiple times
           info._timerTask.cancel();
@@ -559,7 +559,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     _taskMap.clear();
 
     shutdownAndAwaitTermination(_cancellationExcutorService);
-    _messageFutureMap.clear();
+    _messageTaskMap.clear();
 
     _lastSessionSyncTime = null;
   }
@@ -739,14 +739,16 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
           stateTransitionHandlers.remove(messageTarget);
           continue;
         } else {
-          if (_messageFutureMap.containsKey(messageTarget)) {
-            Future<HelixTaskResult> future = 
_messageFutureMap.get(messageTarget);
+          if (_messageTaskMap.containsKey(messageTarget)) {
+            // Lock the task object to avoid race condition between cancel and 
start tasks.
             // Cancel the from future without interrupt ->  Cancel the task 
future without
             // interruptting the state transition that is already started.  If 
the state transition
             // is already started, we should call cancel in the state model.
-            if (future.cancel(false) || future.isDone()) {
-              markReadMessage(message, changeContext, accessor);
-              readMsgs.add(message);
+            String taskId = _messageTaskMap.get(messageTarget);
+            HelixTask task = (HelixTask) _taskMap.get(taskId).getTask();
+            Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
+            if (task.cancel()) {
+              future.cancel(false);
               _monitor.reportProcessedMessage(message,
                   ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
               removeMessageFromZk(accessor, message, instanceName);
@@ -874,10 +876,10 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     return handlerFactory.createHandler(message, changeContext);
   }
 
-  private void removeMessageFromFutureMap(Message message) {
+  private void removeMessageFromTaskAndFutureMap(Message message) {
     String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-    if (_messageFutureMap.containsKey(messageTarget)) {
-      _messageFutureMap.remove(messageTarget);
+    if (_messageTaskMap.containsKey(messageTarget)) {
+      _messageTaskMap.remove(messageTarget);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
index 506886d..089a8d0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
@@ -41,6 +41,7 @@ public abstract class MessageHandler {
    * The message to be handled
    */
   protected final Message _message;
+  protected boolean _cancelled;
 
   /**
    * The context for handling the message. The cluster manager interface can be
@@ -55,6 +56,7 @@ public abstract class MessageHandler {
   public MessageHandler(Message message, NotificationContext context) {
     _message = message;
     _notificationContext = context;
+    _cancelled = false;
   }
 
   /**
@@ -88,4 +90,8 @@ public abstract class MessageHandler {
   public Message getMessage() {
     return _message;
   }
+
+  public void cancel() {
+    _cancelled = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
index 6811adc..74ac948 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
@@ -19,13 +19,18 @@ package org.apache.helix.messaging.handling;
  * under the License.
  */
 
+import java.util.List;
+
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 
 public interface MessageHandlerFactory {
   MessageHandler createHandler(Message message, NotificationContext context);
 
+  @Deprecated
   String getMessageType();
 
+  List<String> getMessageTypes();
+
   void reset();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
index a17f8cc..ad355d3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
@@ -33,4 +33,5 @@ public interface MessageTask extends 
Callable<HelixTaskResult> {
 
   void onTimeout();
 
+  boolean cancel();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
index 28d028a..4a7923b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
@@ -37,4 +37,8 @@ public class MessageTaskInfo {
     return _future;
   }
 
+  public MessageTask getTask(){
+    return _task;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index ae90829..6f7bc87 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -116,7 +116,7 @@ public class Message extends HelixProperty {
    * @param msgId unique message identifier
    */
   public Message(MessageType type, String msgId) {
-    this(type.toString(), msgId);
+    this(type.name(), msgId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 4c38fa4..834c5c9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +47,8 @@ import 
org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+
 public class HelixStateMachineEngine implements StateMachineEngine {
   private static Logger logger = 
Logger.getLogger(HelixStateMachineEngine.class);
 
@@ -162,7 +165,8 @@ public class HelixStateMachineEngine implements 
StateMachineEngine {
   public MessageHandler createHandler(Message message, NotificationContext 
context) {
     String type = message.getMsgType();
 
-    if (!type.equals(MessageType.STATE_TRANSITION.name())) {
+    if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type
+        .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
       throw new HelixException("Expect state-transition message type, but was "
           + message.getMsgType() + ", msgId: " + message.getMsgId());
     }
@@ -255,6 +259,11 @@ public class HelixStateMachineEngine implements 
StateMachineEngine {
     return MessageType.STATE_TRANSITION.name();
   }
 
+  @Override public List<String> getMessageTypes() {
+    return ImmutableList
+        .of(MessageType.STATE_TRANSITION.name(), 
MessageType.STATE_TRANSITION_CANCELLATION.name());
+  }
+
   @Override
   public boolean removeStateModelFactory(String stateModelDef,
       StateModelFactory<? extends StateModel> factory) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 56ea430..6f29723 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -25,7 +25,7 @@ import org.apache.log4j.Logger;
 
 public abstract class StateModel {
   static final String DEFAULT_INITIAL_STATE = "OFFLINE";
-  private boolean _cancelled;
+  protected boolean _cancelled;
   Logger logger = Logger.getLogger(StateModel.class);
 
   // TODO Get default state from implementation or from state model annotation
@@ -87,4 +87,12 @@ public abstract class StateModel {
   public void cancel() {
     _cancelled = true;
   }
+
+  /**
+   * Default implementation to check whether state transition has been 
cancelled or not
+   * @return
+   */
+  public boolean isCancelled() {
+    return _cancelled;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java 
b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 70469d8..2401a4a 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -678,6 +678,11 @@ public class Mocks {
 
     }
 
+    @Override public void registerMessageHandlerFactory(List<String> types,
+        MessageHandlerFactory factory) {
+      // TODO Auto-generated method stub
+    }
+
     @Override
     public int send(Criteria receipientCriteria, Message message, 
AsyncCallback callbackOnReply,
         int timeOut, int retryCount) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java 
b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index c62cefd..d171843 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -62,7 +62,7 @@ public class TestHelixTaskExecutor {
 
     MockHelixTaskExecutor executor = new MockHelixTaskExecutor();
     MockStateModel stateModel = new MockStateModel();
-    executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
+    executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
         new AsyncCallbackService());
 
     NotificationContext context = new NotificationContext(manager);

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 030a0c0..f7b9591 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration;
  */
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.helix.Criteria;
@@ -36,6 +37,8 @@ import org.apache.helix.model.Message.MessageType;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestMessagingService extends ZkStandAloneCMTestBase {
   public static class TestMessagingHandlerFactory implements 
MessageHandlerFactory {
     public static HashSet<String> _processedMsgIds = new HashSet<String>();
@@ -51,6 +54,11 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
     }
 
     @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of("TestExtensibility");
+    }
+
+    @Override
     public void reset() {
       // TODO Auto-generated method stub
 
@@ -87,11 +95,11 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageTypes(),
         factory);
 
     String msgId = new UUID(123, 456).toString();
-    Message msg = new Message(factory.getMessageType(), msgId);
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
     msg.setMsgId(msgId);
     msg.setSrcName(hostSrc);
     msg.setTgtSessionId("*");
@@ -179,14 +187,14 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageTypes(),
         factory);
 
-    
_participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+    
_participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageTypes(),
         factory);
 
     String msgId = new UUID(123, 456).toString();
-    Message msg = new Message(factory.getMessageType(), msgId);
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
     msg.setMsgId(msgId);
     msg.setSrcName(hostSrc);
 
@@ -246,11 +254,11 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+    
_participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageTypes(),
         factory);
 
     String msgId = new UUID(123, 456).toString();
-    Message msg = new Message(factory.getMessageType(), msgId);
+    Message msg = new Message(factory.getMessageTypes().get(0), msgId);
     msg.setMsgId(msgId);
     msg.setSrcName(hostSrc);
 
@@ -287,11 +295,11 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
+          factory.getMessageTypes(), factory);
 
     }
     String msgId = new UUID(123, 456).toString();
-    Message msg = new Message(new 
TestMessagingHandlerFactory().getMessageType(), msgId);
+    Message msg = new Message(new 
TestMessagingHandlerFactory().getMessageTypes().get(0), msgId);
     msg.setMsgId(msgId);
     msg.setSrcName(hostSrc);
 
@@ -351,11 +359,11 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
+          factory.getMessageTypes(), factory);
 
     }
     String msgId = new UUID(123, 456).toString();
-    Message msg = new Message(new 
TestMessagingHandlerFactory().getMessageType(), msgId);
+    Message msg = new Message(new 
TestMessagingHandlerFactory().getMessageTypes().get(0), msgId);
     msg.setMsgId(msgId);
     msg.setSrcName(hostSrc);
 
@@ -387,7 +395,7 @@ public class TestMessagingService extends 
ZkStandAloneCMTestBase {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
+          factory.getMessageTypes(), factory);
 
     }
     String msgId = new UUID(123, 456).toString();

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index c41683a..373edb5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -58,6 +58,7 @@ import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
 
 public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
 
@@ -94,6 +95,10 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       return "TestParticipant";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestParticipant");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -150,6 +155,10 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       return "TestMessagingHandlerLatch";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestMessagingHandlerLatch");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -192,7 +201,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
@@ -206,7 +215,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     // 
schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE,
     // "TestSchedulerMsg");
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
+    Message msg = new Message(_factory.getMessageTypes().get(0), "Template");
     msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
@@ -244,7 +253,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
 
     Assert.assertEquals(_PARTITIONS, _factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
             schedulerMessage.getMsgId());
 
     int messageResultCount = 0;
@@ -326,7 +335,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
@@ -339,7 +348,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
+    Message msg = new Message(_factory.getMessageTypes().get(0), "Template");
     msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
@@ -373,7 +382,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
 
     Assert.assertEquals(0, _factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
             schedulerMessage.getMsgId());
     for (int i = 0; i < 10; i++) {
       StatusUpdate update = 
helixDataAccessor.getProperty(controllerTaskStatus);
@@ -397,10 +406,10 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
@@ -413,7 +422,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
+    Message msg = new Message(_factory.getMessageTypes().get(0), "Template");
     msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
@@ -481,7 +490,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       for (int j = 0; j < 100; j++) {
         Thread.sleep(200);
         PropertyKey controllerTaskStatus =
-            
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
         ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
         if (statusUpdate.getMapFields().containsKey("Summary")) {
           break;
@@ -489,7 +498,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       }
 
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
           .equals("" + (_PARTITIONS * 3 / 5)));
@@ -518,10 +527,10 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       _participants[i].getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
+          _factory.getMessageTypes(), _factory);
 
       manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
@@ -534,7 +543,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), "Template");
+    Message msg = new Message(_factory.getMessageTypes().get(0), "Template");
     msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
@@ -617,7 +626,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       for (int j = 0; j < 100; j++) {
         Thread.sleep(200);
         PropertyKey controllerTaskStatus =
-            
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
         ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
         if (statusUpdate.getMapFields().containsKey("Summary")) {
           // System.err.println(msgId+" done");
@@ -626,7 +635,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
       }
 
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
           .equals("" + (_PARTITIONS * 3 / 5)));
@@ -646,7 +655,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBase {
     for (int j = 0; j < 100; j++) {
       Thread.sleep(200);
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), 
msgIdPrime);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgIdPrime);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       if (statusUpdate.getMapFields().containsKey("Summary")) {
         break;

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
index b0ee961..71e33c8 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java
@@ -106,7 +106,7 @@ public class TestSchedulerMessage2 extends 
ZkStandAloneCMTestBase {
     for (int i = 0; i < 10; i++) {
       Thread.sleep(200);
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       if (statusUpdate.getMapFields().containsKey("Summary")) {
         break;
@@ -115,7 +115,7 @@ public class TestSchedulerMessage2 extends 
ZkStandAloneCMTestBase {
 
     Assert.assertEquals(_PARTITIONS, _factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), 
msgId);
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
     ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
     
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
         .equals("" + (_PARTITIONS * 3)));

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
index 51a225e..91f8b07 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
@@ -131,7 +131,7 @@ public class TestSchedulerMsgContraints extends 
ZkStandAloneCMTestBase {
     for (int j = 0; j < 10; j++) {
       Thread.sleep(200);
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
         Assert.assertEquals(
@@ -156,7 +156,7 @@ public class TestSchedulerMsgContraints extends 
ZkStandAloneCMTestBase {
     for (int j = 0; j < 10; j++) {
       Thread.sleep(200);
       PropertyKey controllerTaskStatus =
-          
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       if (statusUpdate.getMapFields().containsKey("Summary")) {
         break;
@@ -165,7 +165,7 @@ public class TestSchedulerMsgContraints extends 
ZkStandAloneCMTestBase {
 
     Assert.assertEquals(_PARTITIONS, factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), 
msgId);
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), 
msgId);
     ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
     
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
         .equals("" + (_PARTITIONS * 3)));

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
index e7deefa..4dfcb6e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
@@ -103,7 +103,7 @@ public class TestSchedulerMsgUsingQueue extends 
ZkStandAloneCMTestBase {
 
     Assert.assertEquals(_PARTITIONS, _factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
             schedulerMessage.getMsgId());
 
     int messageResultCount = 0;

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
index f027422..0fcd78e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -41,6 +42,8 @@ import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestZkSessionExpiry extends ZkUnitTestBase {
   final static String DUMMY_MSG_TYPE = "DUMMY";
 
@@ -85,6 +88,10 @@ public class TestZkSessionExpiry extends ZkUnitTestBase {
       return DUMMY_MSG_TYPE;
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of(DUMMY_MSG_TYPE);
+    }
+
     @Override
     public void reset() {
       // Do nothing
@@ -158,7 +165,7 @@ public class TestZkSessionExpiry extends ZkUnitTestBase {
   /**
    * trigger dummy message handler and verify it's invoked
    * @param manager
-   * @param handledMsgMap
+   * @param handledMsgSet
    * @throws Exception
    */
   private static void checkDummyMsgHandler(HelixManager manager,

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 95abd29..3cc23e6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -21,6 +21,7 @@ package org.apache.helix.messaging;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.helix.Criteria;
@@ -40,9 +41,12 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestDefaultMessagingService {
   class MockHelixManager extends Mocks.MockManager {
     class MockDataAccessor extends Mocks.MockAccessor {
@@ -154,6 +158,10 @@ public class TestDefaultMessagingService {
       return "TestingMessageHandler";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestingMessageHandler");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -161,6 +169,41 @@ public class TestDefaultMessagingService {
     }
   }
 
+  class TestStateTransitionHandlerFactory implements MessageHandlerFactory {
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext 
context) {
+      return null;
+    }
+
+    @Override
+    public String getMessageType() {
+      return null;
+    }
+
+    @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of(Message.MessageType.STATE_TRANSITION.name(),
+          Message.MessageType.STATE_TRANSITION_CANCELLATION.name(),
+          Message.MessageType.CONTROLLER_MSG.name());
+    }
+
+    @Override
+    public void reset() {
+
+    }
+  }
+
+  class MockDefaultMessagingService extends DefaultMessagingService {
+    public MockDefaultMessagingService(HelixManager manager) {
+      super(manager);
+    }
+
+    public Map<String, MessageHandlerFactory> getMessageHandlerFactoryMap() {
+      return _messageHandlerFactoriestobeAdded;
+    }
+  }
+
   @Test()
   public void TestMessageSend() {
     HelixManager manager = new MockHelixManager();
@@ -243,4 +286,18 @@ public class TestDefaultMessagingService {
     recipientCriteria.setPartition("%");
     AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
   }
+
+  @Test public void testMultipleMessageTypeRegisteration() {
+    HelixManager manager = new Mocks.MockManager();
+    MockDefaultMessagingService svc = new MockDefaultMessagingService(manager);
+    TestStateTransitionHandlerFactory factory = new 
TestStateTransitionHandlerFactory();
+    svc.registerMessageHandlerFactory(factory.getMessageTypes(), factory);
+
+    Assert.assertTrue(
+        
svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.STATE_TRANSITION.name()));
+    Assert.assertTrue(svc.getMessageHandlerFactoryMap()
+        
.containsKey(Message.MessageType.STATE_TRANSITION_CANCELLATION.name()));
+    Assert.assertTrue(
+        
svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 385d761..13ef2fd 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -20,6 +20,7 @@ package org.apache.helix.messaging.handling;
  */
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.helix.ConfigAccessor;
@@ -36,6 +37,8 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
   public static class TestMessagingHandlerFactory implements 
MessageHandlerFactory {
     public static HashSet<String> _processedMsgIds = new HashSet<String>();
@@ -50,6 +53,10 @@ public class TestConfigThreadpoolSize extends 
ZkStandAloneCMTestBase {
       return "TestMsg";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestMsg");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub
@@ -70,6 +77,10 @@ public class TestConfigThreadpoolSize extends 
ZkStandAloneCMTestBase {
       return "TestMsg2";
     }
 
+    @Override public List<String> getMessageTypes() {
+      return ImmutableList.of("TestMsg2");
+    }
+
     @Override
     public void reset() {
       // TODO Auto-generated method stub

Reply via email to