Improve Helix performance by avoiding reading a known message multiple times.

Keep tracking the already read messages using a cache in HelixTaskExecutor.
If the message has been read and marked, skip reading that message again.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fbb19543
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fbb19543
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fbb19543

Branch: refs/heads/master
Commit: fbb195433858b365afb300f1d8f822e955a2d04e
Parents: b5f4638
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Thu Sep 7 17:45:58 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:07:49 2017 -0800

----------------------------------------------------------------------
 .../org/apache/helix/NotificationContext.java   |  45 ++++++-
 .../helix/manager/zk/CallbackHandler.java       |   4 +
 .../messaging/handling/HelixTaskExecutor.java   | 123 +++++++++++--------
 .../handling/TestHelixTaskExecutor.java         |  61 ++++++++-
 4 files changed, 176 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java 
b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 81e3485..ba8b4e9 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -39,12 +39,14 @@ public class NotificationContext {
 
   private HelixManager _manager;
   private Type _type;
+  private HelixConstants.ChangeType _changeType;
   private String _pathChanged;
   private String _eventName;
   private long _creationTime;
 
   /**
    * Get the name associated with the event
+   *
    * @return event name
    */
   public String getEventName() {
@@ -53,6 +55,7 @@ public class NotificationContext {
 
   /**
    * Set the name associated with the event
+   *
    * @param eventName the event name
    */
   public void setEventName(String eventName) {
@@ -61,6 +64,7 @@ public class NotificationContext {
 
   /**
    * Instantiate with a HelixManager
+   *
    * @param manager {@link HelixManager} object
    */
   public NotificationContext(HelixManager manager) {
@@ -71,6 +75,7 @@ public class NotificationContext {
 
   /**
    * Get the HelixManager associated with this notification
+   *
    * @return {@link HelixManager} object
    */
   public HelixManager getManager() {
@@ -79,6 +84,7 @@ public class NotificationContext {
 
   /**
    * Get a map describing the update (keyed on {@link MapKey})
+   *
    * @return the object map describing the update
    */
   public Map<String, Object> getMap() {
@@ -87,6 +93,7 @@ public class NotificationContext {
 
   /**
    * Get the type of the notification
+   *
    * @return the notification type
    */
   public Type getType() {
@@ -95,23 +102,35 @@ public class NotificationContext {
 
   /**
    * Set the HelixManager associated with this notification
+   *
    * @param manager {@link HelixManager} object
    */
   public void setManager(HelixManager manager) {
     this._manager = manager;
   }
 
+  /**
+   * Gets creation time.
+   *
+   * @return the creation time
+   */
   public long getCreationTime() {
     return _creationTime;
   }
 
+  /**
+   * Sets creation time.
+   *
+   * @param creationTime the creation time
+   */
   public void setCreationTime(long creationTime) {
     _creationTime = creationTime;
   }
 
   /**
    * Add notification metadata
-   * @param key String value of a {@link MapKey}
+   *
+   * @param key   String value of a {@link MapKey}
    * @param value
    */
   public void add(String key, Object value) {
@@ -120,6 +139,7 @@ public class NotificationContext {
 
   /**
    * Set the notification map
+   *
    * @param map
    */
   public void setMap(Map<String, Object> map) {
@@ -128,6 +148,7 @@ public class NotificationContext {
 
   /**
    * Set the notification type
+   *
    * @param {@link Type} object
    */
   public void setType(Type type) {
@@ -136,8 +157,8 @@ public class NotificationContext {
 
   /**
    * Get a notification attribute
+   *
    * @param key String from a {@link MapKey}
-   * @return
    */
   public Object get(String key) {
     return _map.get(key);
@@ -154,6 +175,7 @@ public class NotificationContext {
 
   /**
    * Get the path changed status
+   *
    * @return String corresponding to the path change
    */
   public String getPathChanged() {
@@ -162,9 +184,28 @@ public class NotificationContext {
 
   /**
    * Set the path changed status
+   *
    * @param pathChanged
    */
   public void setPathChanged(String pathChanged) {
     this._pathChanged = pathChanged;
   }
+
+  /**
+   * Gets the change type.
+   *
+   * @return the change type
+   */
+  public HelixConstants.ChangeType getChangeType() {
+    return _changeType;
+  }
+
+  /**
+   * Sets the change type.
+   *
+   * @param changeType the change type
+   */
+  public void setChangeType(HelixConstants.ChangeType changeType) {
+    this._changeType = changeType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index c63680f..41fae3e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -511,6 +511,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.INIT);
+      changeContext.setChangeType(_changeType);
       enqueueTask(changeContext);
     } catch (Exception e) {
       String msg = "Exception while invoking init callback for listener:" + 
_listener;
@@ -528,6 +529,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
         NotificationContext changeContext = new NotificationContext(_manager);
         changeContext.setType(NotificationContext.Type.CALLBACK);
         changeContext.setPathChanged(dataPath);
+        changeContext.setChangeType(_changeType);
         enqueueTask(changeContext);
       }
     } catch (Exception e) {
@@ -579,6 +581,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
           NotificationContext changeContext = new 
NotificationContext(_manager);
           changeContext.setType(NotificationContext.Type.CALLBACK);
           changeContext.setPathChanged(parentPath);
+          changeContext.setChangeType(_changeType);
           enqueueTask(changeContext);
         }
       }
@@ -596,6 +599,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
+      changeContext.setChangeType(_changeType);
       enqueueTask(changeContext);
     } catch (Exception e) {
       String msg = "Exception while resetting the listener:" + _listener;

http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index ad4a276..d650fbb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -19,35 +19,12 @@ package org.apache.helix.messaging.handling;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.MessageListener;
-import org.apache.helix.NotificationContext;
+import org.apache.helix.*;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
@@ -66,6 +43,9 @@ import 
org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
+import java.util.*;
+import java.util.concurrent.*;
+
 public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /**
    * Put together all registration information about a message handler factory
@@ -125,6 +105,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
   final ConcurrentHashMap<String, String> _messageTaskMap;
 
+  final Set<String> _knownMessageIds;
+
   /* Resources whose configuration for dedicate thread pool has been checked.*/
   final Set<String> _resourcesThreadpoolChecked;
   final Set<String> _transitionTypeThreadpoolChecked;
@@ -140,11 +122,12 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
 
   public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
     _monitor = participantStatusMonitor;
-    _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
+    _taskMap = new ConcurrentHashMap<>();
 
-    _hdlrFtyRegistry = new ConcurrentHashMap<String, 
MsgHandlerFactoryRegistryItem>();
-    _executorMap = new ConcurrentHashMap<String, ExecutorService>();
-    _messageTaskMap = new ConcurrentHashMap<String, String>();
+    _hdlrFtyRegistry = new ConcurrentHashMap<>();
+    _executorMap = new ConcurrentHashMap<>();
+    _messageTaskMap = new ConcurrentHashMap<>();
+    _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<String, 
Boolean>());
     _batchMessageExecutorService = Executors.newCachedThreadPool();
     _monitor.createExecutorMonitor("BatchMessageExecutor", 
_batchMessageExecutorService);
 
@@ -500,9 +483,10 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   private void updateMessageState(List<Message> readMsgs, HelixDataAccessor 
accessor,
       String instanceName) {
     Builder keyBuilder = accessor.keyBuilder();
-    List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
+    List<PropertyKey> readMsgKeys = new ArrayList<>();
     for (Message msg : readMsgs) {
       readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
+      _knownMessageIds.add(msg.getId());
     }
     accessor.setChildren(readMsgKeys, readMsgs);
   }
@@ -589,6 +573,8 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
     _messageTaskMap.clear();
 
+    _knownMessageIds.clear();
+
     _lastSessionSyncTime = null;
   }
 
@@ -611,7 +597,6 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         LOG.info("Skip init a new thread pool for type: " + msgType + ", 
already existing pool: "
             + prevPool + ", isShutdown: " + prevPool.isShutdown());
         newPool.shutdown();
-        newPool = null;
       } else {
         _monitor.createExecutorMonitor(msgType, newPool);
       }
@@ -641,7 +626,47 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     }
   }
 
+  private List<Message> readNewMessagesFromZK(HelixManager manager, String 
instanceName,
+      HelixConstants.ChangeType changeType) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    Set<String> messageIds = new HashSet<>();
+    if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
+      
messageIds.addAll(accessor.getChildNames(keyBuilder.messages(instanceName)));
+    } else if 
(changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
+      
messageIds.addAll(accessor.getChildNames(keyBuilder.controllerMessages()));
+    } else {
+      LOG.warn("Unexpected ChangeType for Message Change CallbackHandler: " + 
changeType);
+      return Collections.emptyList();
+    }
+
+    // In case the cache contains any deleted message Id, clean up
+    _knownMessageIds.retainAll(messageIds);
+
+    messageIds.removeAll(_knownMessageIds);
+    List<PropertyKey> keys = new ArrayList<>();
+    for (String messageId : messageIds) {
+      if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
+        keys.add(keyBuilder.message(instanceName, messageId));
+      } else if 
(changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
+        keys.add(keyBuilder.controllerMessage(messageId));
+      }
+    }
+
+    List<Message> newMessages = accessor.getProperty(keys);
+    // Message may be removed before get read, clean up null messages.
+    Iterator<Message> messageIterator = newMessages.iterator();
+    while(messageIterator.hasNext()) {
+      if (messageIterator.next() == null) {
+        messageIterator.remove();
+      }
+    }
+    return newMessages;
+  }
+
   @Override
+  @PreFetch(enabled = false)
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
 
@@ -664,6 +689,11 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       // continue to process messages
     }
 
+    if (messages == null || messages.isEmpty()) {
+      // If no messages are given, check and read all new messages.
+      messages = readNewMessagesFromZK(manager, instanceName, 
changeContext.getChangeType());
+    }
+
     if (_isShuttingDown) {
       StringBuilder sb = new StringBuilder();
       for (Message message : messages) {
@@ -689,18 +719,18 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     Builder keyBuilder = accessor.keyBuilder();
 
     // message handlers created
-    Map<String, MessageHandler> stateTransitionHandlers = new HashMap<String, 
MessageHandler>();
-    List<MessageHandler> nonStateTransitionHandlers = new 
ArrayList<MessageHandler>();
+    Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
+    List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
 
     // message read
-    List<Message> readMsgs = new ArrayList<Message>();
+    List<Message> readMsgs = new ArrayList<>();
 
     String sessionId = manager.getSessionId();
     List<String> curResourceNames =
         accessor.getChildNames(keyBuilder.currentStates(instanceName, 
sessionId));
-    List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
-    List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
-    Set<String> createCurStateNames = new HashSet<String>();
+    List<PropertyKey> createCurStateKeys = new ArrayList<>();
+    List<CurrentState> metaCurStates = new ArrayList<>();
+    Set<String> createCurStateNames = new HashSet<>();
 
     for (Message message : messages) {
       // nop messages are simply removed. It is used to trigger onMessage() in
@@ -708,7 +738,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) 
{
         LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
             + message.getMsgSrc());
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        removeMessageFromZk(accessor, message, instanceName);
         continue;
       }
 
@@ -721,7 +751,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
                 + ", tgtSessionId in message: " + tgtSessionId + ", messageId: 
"
                 + message.getMsgId();
         LOG.warn(warningMessage);
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        removeMessageFromZk(accessor, message, instanceName);
         _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, 
warningMessage, accessor);
 
         // Proactively send a session sync message from participant to 
controller
@@ -743,7 +773,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         PropertyKey key = new 
Builder(manager.getClusterName()).liveInstances();
         List<LiveInstance> liveInstances = 
manager.getHelixDataAccessor().getChildValues(key);
         _controller.onLiveInstanceChange(liveInstances, changeContext);
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        removeMessageFromZk(accessor, message, instanceName);
         _monitor.reportReceivedMessage(message);
         _monitor.reportProcessedMessage(message, 
ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
         continue;
@@ -846,7 +876,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, 
error, accessor);
 
         message.setMsgState(MessageState.UNPROCESSABLE);
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        removeMessageFromZk(accessor, message, instanceName);
         LOG.error("Message cannot be processed: " + message.getRecord(), e);
         _monitor.reportProcessedMessage(message, 
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         continue;
@@ -937,6 +967,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
   }
 
   private void removeMessageFromTaskAndFutureMap(Message message) {
+    _knownMessageIds.remove(message.getId());
     String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
     if (_messageTaskMap.containsKey(messageTarget)) {
       _messageTaskMap.remove(messageTarget);
@@ -966,13 +997,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
   private void removeMessageFromZk(HelixDataAccessor accessor, Message message,
       String instanceName) {
-    Builder keyBuilder = accessor.keyBuilder();
-    if (message.getTgtName().equalsIgnoreCase("controller")) {
-      // TODO: removeProperty returns boolean
-      
accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
-    } else {
-      accessor.removeProperty(keyBuilder.message(instanceName, 
message.getMsgId()));
-    }
+    accessor.removeProperty(message.getKey(accessor.keyBuilder(), 
instanceName));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index c53f09f..40370d5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -20,17 +20,14 @@ package org.apache.helix.messaging.handling;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.listeners.ClusterConfigChangeListener;
-import org.apache.helix.api.listeners.ResourceConfigChangeListener;
+import org.apache.helix.*;
 import org.apache.helix.mock.MockManager;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.testng.Assert;
@@ -50,7 +47,7 @@ public class TestHelixTaskExecutor {
 
   class TestMessageHandlerFactory implements MessageHandlerFactory {
     int _handlersCreated = 0;
-    ConcurrentHashMap<String, String> _processedMsgIds = new 
ConcurrentHashMap<String, String>();
+    ConcurrentHashMap<String, String> _processedMsgIds = new 
ConcurrentHashMap<>();
 
     class TestMessageHandler extends MessageHandler {
       public TestMessageHandler(Message message, NotificationContext context) {
@@ -277,6 +274,7 @@ public class TestHelixTaskExecutor {
       msg.setCorrelationId(UUID.randomUUID().toString());
       msgList.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(1000);
@@ -328,6 +326,7 @@ public class TestHelixTaskExecutor {
       msg.setSrcName("127.101.1.23_2234");
       msgList.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(1000);
@@ -377,6 +376,7 @@ public class TestHelixTaskExecutor {
       msg.setTgtName("");
       msgList.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(1000);
@@ -424,6 +424,7 @@ public class TestHelixTaskExecutor {
     exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
     msgList.add(exceptionMsg);
 
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(1000);
@@ -466,6 +467,7 @@ public class TestHelixTaskExecutor {
       msg.setSrcName("127.101.1.23_2234");
       msgListToCancel.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
     Thread.sleep(500);
     for (int i = 0; i < nMsgs2; i++) {
@@ -535,6 +537,7 @@ public class TestHelixTaskExecutor {
       msgList.add(msg);
     }
     NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("some", msgList, changeContext);
     Thread.sleep(500);
     for (ExecutorService svc : executor._executorMap.values()) {
@@ -572,6 +575,7 @@ public class TestHelixTaskExecutor {
       msg.setExecutionTimeout((i + 1) * 600);
       msgList.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(4000);
@@ -618,6 +622,7 @@ public class TestHelixTaskExecutor {
       msg.setRetryCount(1);
       msgList.add(msg);
     }
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
     Thread.sleep(3500);
     AssertJUnit.assertEquals(factory._processedMsgIds.size(), 3);
@@ -663,10 +668,54 @@ public class TestHelixTaskExecutor {
     msg2.setToState("MASTER");
     msgList.add(msg2);
 
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(3000);
     AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
     AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 
0);
   }
+
+  @Test
+  public void testMessageReadOptimization() throws InterruptedException {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    List<String> messageIds = new ArrayList<>();
+    int nMsgs1 = 5;
+    for (int i = 0; i < nMsgs1; i++) {
+      Message msg = new Message(factory.getMessageTypes().get(0), 
UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setCorrelationId(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.message("someInstance", msg.getId()), 
msg);
+      messageIds.add(msg.getId());
+    }
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+
+    // Simulate read message already, then processing message. Should read and 
handle no message.
+    executor._knownMessageIds.addAll(messageIds);
+    executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext);
+    Thread.sleep(3000);
+    AssertJUnit.assertEquals(0, factory._processedMsgIds.size());
+    executor._knownMessageIds.clear();
+
+    // Processing message normally
+    executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext);
+    Thread.sleep(3000);
+    AssertJUnit.assertEquals(nMsgs1, factory._processedMsgIds.size());
+    // After all messages are processed, _knownMessageIds should be empty.
+    Assert.assertTrue(executor._knownMessageIds.isEmpty());
+  }
 }

Reply via email to