This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 215d407  Refine the HelixTaskExecutor to reduce duplicate code and 
clarify the code structure. (#1489)
215d407 is described below

commit 215d4072756651ea7f242adc68442780828ba0f9
Author: Jiajun Wang <[email protected]>
AuthorDate: Thu Oct 29 13:41:22 2020 -0700

    Refine the HelixTaskExecutor to reduce duplicate code and clarify the code 
structure. (#1489)
    
    There is a minor code logic change for optimization. But there is no 
business logic change in this PR.
---
 .../messaging/handling/HelixTaskExecutor.java      | 505 +++++++++++----------
 1 file changed, 270 insertions(+), 235 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 1e9c455..b039afd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -76,6 +75,7 @@ import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /**
    * Put together all registration information about a message handler factory
@@ -165,19 +165,18 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     _hdlrFtyRegistry = new ConcurrentHashMap<>();
     _executorMap = new ConcurrentHashMap<>();
     _messageTaskMap = new ConcurrentHashMap<>();
-    _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<String, 
Boolean>());
+    _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
     _batchMessageExecutorService = Executors.newCachedThreadPool();
     _monitor.createExecutorMonitor("BatchMessageExecutor", 
_batchMessageExecutorService);
 
-    _resourcesThreadpoolChecked =
-        Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-    _transitionTypeThreadpoolChecked =
-        Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+    _resourcesThreadpoolChecked = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+    _transitionTypeThreadpoolChecked = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
 
-    _timer = new Timer("HelixTaskExecutor_Timer", true); // created as a 
daemon timer thread to handle task timeout
+    // created as a daemon timer thread to handle task timeout
+    _timer = new Timer("HelixTaskExecutor_Timer", true);
 
     _isShuttingDown = false;
 
@@ -192,7 +191,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory 
factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) 
factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + 
type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
@@ -200,35 +199,30 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + 
factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + 
factory
+                .getMessageType());
       }
     }
 
     _isShuttingDown = false;
 
-    MsgHandlerFactoryRegistryItem newItem = new 
MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+    MsgHandlerFactoryRegistryItem newItem =
+        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
     MsgHandlerFactoryRegistryItem prevItem = 
_hdlrFtyRegistry.putIfAbsent(type, newItem);
     if (prevItem == null) {
-      ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, 
new ThreadFactory() {
-        @Override public Thread newThread(Runnable r) {
-          return new Thread(r, "HelixTaskExecutor-message_handle_thread_" + 
thread_uid.getAndIncrement());
-        }
-      });
-      ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
-      if (prevExecutor != null) {
-        LOG.warn("Skip creating a new thread pool for type: " + type + ", 
already existing pool: "
-            + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
-        newPool.shutdown();
-        newPool = null;
-      } else {
+      _executorMap.computeIfAbsent(type, msgType -> {
+        ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, 
r -> new Thread(r,
+            "HelixTaskExecutor-message_handle_thread_" + 
thread_uid.getAndIncrement()));
         _monitor.createExecutorMonitor(type, newPool);
-      }
-      LOG.info("Registered message handler factory for type: " + type + ", 
poolSize: "
-          + threadpoolSize + ", factory: " + factory + ", pool: " + 
_executorMap.get(type));
+        return newPool;
+      });
+      LOG.info(
+          "Registered message handler factory for type: " + type + ", 
poolSize: " + threadpoolSize
+              + ", factory: " + factory + ", pool: " + _executorMap.get(type));
     } else {
       LOG.info("Skip register message handler factory for type: " + type + ", 
poolSize: "
-          + threadpoolSize + ", factory: " + factory + ", already existing 
factory: "
-          + prevItem.factory());
+          + threadpoolSize + ", factory: " + factory + ", already existing 
factory: " + prevItem
+          .factory());
       newItem = null;
     }
   }
@@ -287,9 +281,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       ConfigAccessor configAccessor = manager.getConfigAccessor();
       // Changes to this configuration on thread pool size will only take 
effect after the participant get restarted.
       if (configAccessor != null) {
-        HelixConfigScope scope =
-            new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
-                
.forCluster(manager.getClusterName()).forResource(resourceName).build();
+        HelixConfigScope scope = new 
HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+            
.forCluster(manager.getClusterName()).forResource(resourceName).build();
 
         String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
         try {
@@ -303,11 +296,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       }
       final String key = getPerResourceStateTransitionPoolName(resourceName);
       if (threadpoolSize > 0) {
-        _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize, new 
ThreadFactory() {
-          @Override public Thread newThread(Runnable r) {
-            return new Thread(r, "GerenricHelixController-message_handle_" + 
key);
-          }
-        }));
+        _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
+            r -> new Thread(r, "GerenricHelixController-message_handle_" + 
key)));
         LOG.info("Added dedicate threadpool for resource: " + resourceName + " 
with size: "
             + threadpoolSize);
       } else {
@@ -336,7 +326,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   ExecutorService findExecutorServiceForMsg(Message message) {
     ExecutorService executorService = _executorMap.get(message.getMsgType());
     if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
-      if(message.getBatchMessageMode() == true) {
+      if (message.getBatchMessageMode() == true) {
         executorService = _batchMessageExecutorService;
       } else {
         String resourceName = message.getResourceName();
@@ -344,9 +334,11 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
           String key = getPerResourceStateTransitionPoolName(resourceName);
           String perStateTransitionTypeKey =
               getStateTransitionType(key, message.getFromState(), 
message.getToState());
-          if (perStateTransitionTypeKey != null && 
_executorMap.containsKey(perStateTransitionTypeKey)) {
-            LOG.info(String.format("Find per state transition type thread pool 
for resource %s from %s to %s",
-                message.getResourceName(), message.getFromState(), 
message.getToState()));
+          if (perStateTransitionTypeKey != null && _executorMap
+              .containsKey(perStateTransitionTypeKey)) {
+            LOG.info(String
+                .format("Find per state transition type thread pool for 
resource %s from %s to %s",
+                    message.getResourceName(), message.getFromState(), 
message.getToState()));
             executorService = _executorMap.get(perStateTransitionTypeKey);
           } else if (_executorMap.containsKey(key)) {
             LOG.info("Find per-resource thread pool with key: " + key);
@@ -415,8 +407,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, 
message.getResourceName(),
           message.getPartitionName(), message.getFromState(), 
message.getToState());
 
-      _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-          "Message handling task scheduled", manager);
+      _statusUpdateUtil
+          .logInfo(message, HelixTaskExecutor.class, "Message handling task 
scheduled", manager);
 
       // this sync guarantees that ExecutorService.submit() task and put 
taskInfo into map are
       // sync'ed
@@ -442,8 +434,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
           if (message.getExecutionTimeout() > 0) {
             timerTask = new MessageTimeoutTask(this, task);
             _timer.schedule(timerTask, message.getExecutionTimeout());
-            LOG.info("Message starts with timeout " + 
message.getExecutionTimeout() + " MsgId: "
-                + task.getTaskId());
+            LOG.info(
+                "Message starts with timeout " + message.getExecutionTimeout() 
+ " MsgId: " + task
+                    .getTaskId());
           } else {
             LOG.debug("Message does not have timeout. MsgId: " + 
task.getTaskId());
           }
@@ -458,8 +451,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       }
     } catch (Exception e) {
       LOG.error("Error while executing task. " + message, e);
-      _statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error 
while executing task "
-          + e, manager);
+      _statusUpdateUtil
+          .logError(message, HelixTaskExecutor.class, e, "Error while 
executing task " + e,
+              manager);
     }
     return false;
   }
@@ -493,8 +487,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
           _taskMap.remove(taskId);
           return true;
         } else {
-          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-              "fail to cancel task: " + taskId, 
notificationContext.getManager());
+          _statusUpdateUtil
+              .logInfo(message, HelixTaskExecutor.class, "fail to cancel task: 
" + taskId,
+                  notificationContext.getManager());
         }
       } else {
         _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
@@ -538,15 +533,14 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
        * We use the updater to avoid race condition between writing message to 
zk as READ state and removing message after ST is done
        * If there is no message at this path, meaning the message is removed 
so we do not write the message
        */
-      updaters.add(new DataUpdater<ZNRecord>() {
-        @Override
-        public ZNRecord update(ZNRecord currentData) {
-          if (currentData == null) {
-            LOG.warn("Message {} targets at {} has already been removed before 
it is set as READ on instance {}", msg.getId(), msg.getTgtName(), instanceName);
-            return null;
-          }
-          return msg.getRecord();
+      updaters.add(currentData -> {
+        if (currentData == null) {
+          LOG.warn(
+              "Message {} targets at {} has already been removed before it is 
set as READ on instance {}",
+              msg.getId(), msg.getTgtName(), instanceName);
+          return null;
         }
+        return msg.getRecord();
       });
     }
     accessor.updateChildren(readMsgPaths, updaters, AccessOption.PERSISTENT);
@@ -629,7 +623,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     // Log all tasks that fail to terminate
     for (String taskId : _taskMap.keySet()) {
       MessageTaskInfo info = _taskMap.get(taskId);
-      sb.append("Task: " + taskId + " fails to terminate. Message: " + 
info._task.getMessage() + "\n");
+      sb.append(
+          "Task: " + taskId + " fails to terminate. Message: " + 
info._task.getMessage() + "\n");
     }
 
     LOG.info(sb.toString());
@@ -654,31 +649,24 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     // Re-init all existing factories
     for (final String msgType : _hdlrFtyRegistry.keySet()) {
       MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
-      ExecutorService newPool =
-          Executors.newFixedThreadPool(item.threadPoolSize(), new 
ThreadFactory() {
-            @Override public Thread newThread(Runnable r) {
-              return new Thread(r, "HelixTaskExecutor-message_handle_" + 
msgType);
-            }
-          });
-      ExecutorService prevPool = _executorMap.putIfAbsent(msgType, newPool);
-      if (prevPool != null) {
-        // Will happen if we register and call init
-        LOG.info("Skip init a new thread pool for type: " + msgType + ", 
already existing pool: "
-            + prevPool + ", isShutdown: " + prevPool.isShutdown());
-        newPool.shutdown();
-      } else {
-        _monitor.createExecutorMonitor(msgType, newPool);
-      }
+      ExecutorService pool = _executorMap.computeIfAbsent(msgType, type -> {
+        ExecutorService newPool = 
Executors.newFixedThreadPool(item.threadPoolSize(),
+            r -> new Thread(r, "HelixTaskExecutor-message_handle_" + type));
+        _monitor.createExecutorMonitor(type, newPool);
+        return newPool;
+      });
+      LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, 
pool.isShutdown());
     }
   }
 
   private void syncSessionToController(HelixManager manager) {
-    if (_lastSessionSyncTime == null ||
-            System.currentTimeMillis() - _lastSessionSyncTime > 
SESSION_SYNC_INTERVAL) { // > delay since last sync
+    if (_lastSessionSyncTime == null || System.currentTimeMillis() - 
_lastSessionSyncTime
+        > SESSION_SYNC_INTERVAL) { // > delay since last sync
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
       PropertyKey key = new 
Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
       if (accessor.getProperty(key) == null) {
-        LOG.info(String.format("Participant %s syncs session with controller", 
manager.getInstanceName()));
+        LOG.info(String
+            .format("Participant %s syncs session with controller", 
manager.getInstanceName()));
         Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, 
SESSION_SYNC);
         msg.setSrcName(manager.getInstanceName());
         msg.setTgtSessionId("*");
@@ -734,7 +722,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     List<Message> newMessages = accessor.getProperty(keys, false);
     // Message may be removed before get read, clean up null messages.
     Iterator<Message> messageIterator = newMessages.iterator();
-    while(messageIterator.hasNext()) {
+    while (messageIterator.hasNext()) {
       if (messageIterator.next() == null) {
         messageIterator.remove();
       }
@@ -772,7 +760,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       for (Message message : messages) {
         sb.append(message.getMsgId() + ",");
       }
-      LOG.info("Helix task executor is shutting down, discard unprocessed 
messages : " + sb.toString());
+      LOG.info(
+          "Helix task executor is shutting down, ignore unprocessed messages : 
" + sb.toString());
       return;
     }
 
@@ -810,148 +799,36 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     Set<String> createCurStateNames = new HashSet<>();
 
     for (Message message : messages) {
-      try {
-        // nop messages are simply removed. It is used to trigger onMessage() 
in
-        // situations such as register a new message handler factory
-        if 
(message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
-          LOG.info(
-              "Dropping NO-OP message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc());
-          reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        String tgtSessionId = message.getTgtSessionId();
-        // sessionId mismatch normally means message comes from expired 
session, just remove it
-        if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
-          String warningMessage =
-              "SessionId does NOT match. expected sessionId: " + sessionId
-                  + ", tgtSessionId in message: " + tgtSessionId + ", 
messageId: "
-                  + message.getMsgId();
-          LOG.warn(warningMessage);
-          reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
-          _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, 
warningMessage, manager);
-
-          // Proactively send a session sync message from participant to 
controller
-          // upon session mismatch after a new session is established
-          if (manager.getInstanceType() == InstanceType.PARTICIPANT
-              || manager.getInstanceType() == 
InstanceType.CONTROLLER_PARTICIPANT) {
-            if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
-              syncSessionToController(manager);
-            }
-          }
-          continue;
-        }
-
-        if ((manager.getInstanceType() == InstanceType.CONTROLLER
-            || manager.getInstanceType() == 
InstanceType.CONTROLLER_PARTICIPANT)
-            && 
MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
-          LOG.info(String.format("Controller received 
PARTICIPANT_SESSION_CHANGE msg from src: %s",
-              message.getMsgSrc()));
-          PropertyKey key = new 
Builder(manager.getClusterName()).liveInstances();
-          List<LiveInstance> liveInstances =
-              manager.getHelixDataAccessor().getChildValues(key, true);
-          _controller.onLiveInstanceChange(liveInstances, changeContext);
-          reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.COMPLETED);
-          continue;
-        }
-
-        // don't process message that is of READ or UNPROCESSABLE state
-        if (MessageState.NEW != message.getMsgState()) {
-          // It happens because we don't delete message right after
-          // read. Instead we keep it until the current state is updated.
-          // We will read the message again if there is a new message but we
-          // check for the status and ignore if its already read
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Message already read. msgId: " + message.getMsgId());
-          }
-          continue;
-        }
-
-        if (message.isExpired()) {
-          LOG.info(
-              "Dropping expired message. mid: " + message.getId() + ", from: " 
+ message.getMsgSrc()
-                  + " relayed from: " + message.getRelaySrcHost());
-          reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        // State Transition Cancellation
-        if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) 
{
-          boolean success = cancelNotStartedStateTransition(message, 
stateTransitionHandlers, accessor, instanceName);
-          if (success) {
-            continue;
-          }
-        }
-
-        _monitor.reportReceivedMessage(message);
-      } catch (Exception e) {
-        LOG.error("Failed to process the message {}. Deleting the message from 
ZK. Exception: {}",
-            message, e);
-        removeMessageFromTaskAndFutureMap(message);
-        removeMessageFromZK(accessor, message, instanceName);
+      if (checkAndProcessNoOpMessage(message, instanceName, changeContext, 
manager, sessionId,
+          stateTransitionHandlers)) {
+        // skip the following operations for the no-op messages.
         continue;
       }
-
       // create message handlers, if handlers not found, leave its state as NEW
       NotificationContext msgWorkingContext = changeContext.clone();
       try {
-        MessageHandler createHandler = createMessageHandler(message, 
msgWorkingContext);
-        if (createHandler == null) {
+        MessageHandler msgHandler = createMessageHandler(message, 
msgWorkingContext);
+        if (msgHandler == null) {
+          // Failed to create message handler, skip processing this message in 
this callback.
+          // The same message process will be retried in the next round.
           continue;
         }
         if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) 
|| message.getMsgType()
             .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          String messageTarget =
-              getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-
-          if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-              && isStateTransitionInProgress(messageTarget)) {
-
-            String taskId = _messageTaskMap.get(messageTarget);
-            Message msg = _taskMap.get(taskId).getTask().getMessage();
-
-            // If there is another state transition for same partition is 
going on,
-            // discard the message. Controller will resend if this is a valid 
message
-            String errMsg = String.format(
-                "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
-                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
-                System.currentTimeMillis(), message.getFromState(), 
message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor,
-                instanceName, manager);
-            continue;
-          }
-          if (createHandler instanceof HelixStateTransitionHandler) {
-            // We only check to state if there is no ST task 
scheduled/executing.
-            HelixStateTransitionHandler.StaleMessageValidateResult result =
-                ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
-            if (!result.isValid) {
-              handleUnprocessableMessage(message, null /* exception */,
-                  result.exception.getMessage(), accessor, instanceName, 
manager);
-              continue;
-            }
-          }
-          if (stateTransitionHandlers.containsKey(messageTarget)) {
-            // If there are 2 messages in same batch about same partition's 
state transition,
-            // the later one is discarded
-            Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
-            String errMsg = String.format(
-                "Duplicated state transition message: %s. Existing: %s->%s; 
New (Discarded): %s->%s",
-                message.getMsgId(), duplicatedMessage.getFromState(),
-                duplicatedMessage.getToState(), message.getFromState(), 
message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor,
-                instanceName, manager);
+          if (validateAndProcessStateTransitionMessage(message, instanceName, 
manager,
+              stateTransitionHandlers, msgHandler)) {
+            // Need future process by triggering state transition
+            String msgTarget =
+                getMessageTarget(message.getResourceName(), 
message.getPartitionName());
+            stateTransitionHandlers.put(msgTarget, msgHandler);
+            stateTransitionContexts.put(msgTarget, msgWorkingContext);
+          } else {
+            // skip the following operations for the invalid/expired state 
transition messages.
             continue;
           }
-
-          stateTransitionHandlers
-              .put(getMessageTarget(message.getResourceName(), 
message.getPartitionName()),
-                  createHandler);
-          stateTransitionContexts
-              .put(getMessageTarget(message.getResourceName(), 
message.getPartitionName()),
-                  msgWorkingContext);
         } else {
-          nonStateTransitionHandlers.add(createHandler);
+          // Need future process non state transition messages by triggering 
the handler
+          nonStateTransitionHandlers.add(msgHandler);
           nonStateTransitionContexts.add(msgWorkingContext);
         }
       } catch (Exception e) {
@@ -959,15 +836,15 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
         continue;
       }
 
-      markReadMessage(message, msgWorkingContext, manager);
-      readMsgs.add(message);
-
+      // Update the processed message objects
+      readMsgs.add(markReadMessage(message, msgWorkingContext, manager));
       // batch creation of all current state meta data
       // do it for non-controller and state transition messages only
-      if (!message.isControlerMsg()
-          && 
message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
+      if (!message.isControlerMsg() && message.getMsgType()
+          .equals(Message.MessageType.STATE_TRANSITION.name())) {
         String resourceName = message.getResourceName();
-        if (!curResourceNames.contains(resourceName) && 
!createCurStateNames.contains(resourceName)) {
+        if (!curResourceNames.contains(resourceName) && !createCurStateNames
+            .contains(resourceName)) {
           createCurStateNames.add(resourceName);
           createCurStateKeys.add(keyBuilder.currentState(instanceName, 
sessionId, resourceName));
 
@@ -982,7 +859,6 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
           } else {
             
metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
           }
-
           metaCurStates.add(metaCurState);
         }
       }
@@ -997,30 +873,186 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read 
messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : 
stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = 
stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be 
done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is 
required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String 
instanceName,
+      NotificationContext changeContext, HelixManager manager, String 
sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    try {
+      // nop messages are simply removed. It is used to trigger onMessage() in
+      // situations such as register a new message handler factory
+      if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) 
{
+        LOG.info(
+            "Dropping NO-OP message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc());
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
+        return true;
+      }
+
+      String tgtSessionId = message.getTgtSessionId();
+      // sessionId mismatch normally means message comes from expired session, 
just remove it
+      if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
+        String warningMessage = "SessionId does NOT match. expected sessionId: 
" + sessionId
+            + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + 
message.getMsgId();
+        LOG.warn(warningMessage);
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
+        _statusUpdateUtil
+            .logWarning(message, HelixStateMachineEngine.class, 
warningMessage, manager);
+
+        // Proactively send a session sync message from participant to 
controller
+        // upon session mismatch after a new session is established
+        if (manager.getInstanceType() == InstanceType.PARTICIPANT
+            || manager.getInstanceType() == 
InstanceType.CONTROLLER_PARTICIPANT) {
+          if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
+            syncSessionToController(manager);
+          }
         }
+        return true;
+      }
+
+      if ((manager.getInstanceType() == InstanceType.CONTROLLER
+          || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+          && 
MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
+        LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE 
msg from src: %s",
+            message.getMsgSrc()));
+        PropertyKey key = new 
Builder(manager.getClusterName()).liveInstances();
+        List<LiveInstance> liveInstances = 
manager.getHelixDataAccessor().getChildValues(key, true);
+        _controller.onLiveInstanceChange(liveInstances, changeContext);
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.COMPLETED);
+        return true;
       }
+
+      // don't process message that is of READ or UNPROCESSABLE state
+      if (MessageState.NEW != message.getMsgState()) {
+        // It happens because we don't delete message right after
+        // read. Instead we keep it until the current state is updated.
+        // We will read the message again if there is a new message but we
+        // check for the status and ignore if its already read
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Message already read. msgId: " + message.getMsgId());
+        }
+        return true;
+      }
+
+      if (message.isExpired()) {
+        LOG.info(
+            "Dropping expired message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc()
+                + " relayed from: " + message.getRelaySrcHost());
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
+        return true;
+      }
+
+      // State Transition Cancellation
+      if 
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) 
{
+        boolean success =
+            cancelNotStartedStateTransition(message, stateTransitionHandlers, 
accessor,
+                instanceName);
+        if (success) {
+          return true;
+        }
+      }
+
+      _monitor.reportReceivedMessage(message);
+    } catch (Exception e) {
+      LOG.error("Failed to process the message {}. Deleting the message from 
ZK. Exception: {}",
+          message, e);
+      removeMessageFromTaskAndFutureMap(message);
+      removeMessageFromZK(accessor, message, instanceName);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Preprocess the state transition message to validate if the request is 
valid.
+   * If no operation needs to be triggered, discard the the message.
+   * @param message
+   * @param instanceName
+   * @param manager
+   * @param stateTransitionHandlers
+   * @param createHandler
+   * @return True if the requested state transition is valid, and need to 
schedule the transition.
+   *         False if no more operation is required.
+   */
+  private boolean validateAndProcessStateTransitionMessage(Message message, 
String instanceName,
+      HelixManager manager, Map<String, MessageHandler> 
stateTransitionHandlers,
+      MessageHandler createHandler) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+    String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
+    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+        && isStateTransitionInProgress(messageTarget)) {
+      String taskId = _messageTaskMap.get(messageTarget);
+      Message msg = _taskMap.get(taskId).getTask().getMessage();
+      // If there is another state transition for same partition is going on,
+      // discard the message. Controller will resend if this is a valid message
+      String errMsg = String.format(
+          "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+          message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+          msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
+          message.getFromState(), message.getToState());
+      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
+          manager);
+      return false;
+    }
+    if (createHandler instanceof HelixStateTransitionHandler) {
+      // We only check to state if there is no ST task scheduled/executing.
+      HelixStateTransitionHandler.StaleMessageValidateResult result =
+          ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
+      if (!result.isValid) {
+        handleUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
+            accessor, instanceName, manager);
+        return false;
+      }
+    }
+    if (stateTransitionHandlers.containsKey(messageTarget)) {
+      // If there are 2 messages in same batch about same partition's state 
transition,
+      // the later one is discarded
+      Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
+      String errMsg = String.format(
+          "Duplicated state transition message: %s. Existing: %s->%s; New 
(Discarded): %s->%s",
+          message.getMsgId(), duplicatedMessage.getFromState(), 
duplicatedMessage.getToState(),
+          message.getFromState(), message.getToState());
+      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
+          manager);
+      return false;
+    }
+    return true;
+  }
+
+  private void scheduleTaskForMessage(String instanceName, HelixDataAccessor 
accessor,
+      MessageHandler handler, NotificationContext context) {
+    Message msg = handler._message;
+    if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+      // Remove message if schedule tasks are failed.
+      removeMessageFromTaskAndFutureMap(msg);
+      removeMessageFromZK(accessor, msg, instanceName);
     }
   }
 
@@ -1047,9 +1079,11 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
   //                              3. Message handled and task already started
   // This method tries to handle the first two cases, it returns true if no 
further cancellation is needed,
   // false if not been able to cancel the state transition (i.e, further 
cancellation is needed).
-  private boolean cancelNotStartedStateTransition(Message message, Map<String, 
MessageHandler> stateTransitionHandlers,
-      HelixDataAccessor accessor, String instanceName) {
-    String targetMessageName = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
+  private boolean cancelNotStartedStateTransition(Message message,
+      Map<String, MessageHandler> stateTransitionHandlers, HelixDataAccessor 
accessor,
+      String instanceName) {
+    String targetMessageName =
+        getMessageTarget(message.getResourceName(), 
message.getPartitionName());
     ProcessedMessageState messageState;
     Message targetStateTransitionMessage;
 
@@ -1107,13 +1141,14 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     removeMessageFromZK(accessor, message, instanceName);
   }
 
-  private void markReadMessage(Message message, NotificationContext context,
+  private Message markReadMessage(Message message, NotificationContext context,
       HelixManager manager) {
     message.setMsgState(MessageState.READ);
     message.setReadTimeStamp(new Date().getTime());
     message.setExecuteSessionId(context.getManager().getSessionId());
 
     _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New 
Message", manager);
+    return message;
   }
 
   private void handleUnprocessableMessage(Message message, Exception 
exception, String errorMsg,
@@ -1128,12 +1163,12 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     }
     message.setMsgState(MessageState.UNPROCESSABLE);
     removeMessageFromZK(accessor, message, instanceName);
-    _monitor.reportProcessedMessage(message,
-        ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
-
+    _monitor
+        .reportProcessedMessage(message, 
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
   }
+
   public MessageHandler createMessageHandler(Message message, 
NotificationContext changeContext) {
-    String msgType = message.getMsgType().toString();
+    String msgType = message.getMsgType();
 
     MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
 
@@ -1141,8 +1176,8 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     // we will keep the message and the message will be handled when
     // the corresponding MessageHandlerFactory is registered
     if (item == null) {
-      LOG.warn("Fail to find message handler factory for type: " + msgType + " 
msgId: "
-          + message.getMsgId());
+      LOG.warn("Fail to find message handler factory for type: " + msgType + " 
msgId: " + message
+          .getMsgId());
       return null;
     }
     MessageHandlerFactory handlerFactory = item.factory();
@@ -1171,7 +1206,7 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     return String.format("%s_%s", resourceName, partitionName);
   }
 
-  private String getStateTransitionType(String prefix, String fromState, 
String toState){
+  private String getStateTransitionType(String prefix, String fromState, 
String toState) {
     if (prefix == null || fromState == null || toState == null) {
       return null;
     }

Reply via email to