Improve Helix performance by avoiding reading a known message multiple times.
Keep tracking the already read messages using a cache in HelixTaskExecutor. If the message has been read and marked, skip reading that message again. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fbb19543 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fbb19543 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fbb19543 Branch: refs/heads/master Commit: fbb195433858b365afb300f1d8f822e955a2d04e Parents: b5f4638 Author: Jiajun Wang <jjw...@linkedin.com> Authored: Thu Sep 7 17:45:58 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:07:49 2017 -0800 ---------------------------------------------------------------------- .../org/apache/helix/NotificationContext.java | 45 ++++++- .../helix/manager/zk/CallbackHandler.java | 4 + .../messaging/handling/HelixTaskExecutor.java | 123 +++++++++++-------- .../handling/TestHelixTaskExecutor.java | 61 ++++++++- 4 files changed, 176 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/NotificationContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java index 81e3485..ba8b4e9 100644 --- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java +++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java @@ -39,12 +39,14 @@ public class NotificationContext { private HelixManager _manager; private Type _type; + private HelixConstants.ChangeType _changeType; private String _pathChanged; private String _eventName; private long _creationTime; /** * Get the name associated with the event + * * @return event name */ public String getEventName() { @@ -53,6 +55,7 @@ public class NotificationContext { /** * Set the name associated with the event + * * @param eventName the event name */ public void setEventName(String eventName) { @@ -61,6 +64,7 @@ public class NotificationContext { /** * Instantiate with a HelixManager + * * @param manager {@link HelixManager} object */ public NotificationContext(HelixManager manager) { @@ -71,6 +75,7 @@ public class NotificationContext { /** * Get the HelixManager associated with this notification + * * @return {@link HelixManager} object */ public HelixManager getManager() { @@ -79,6 +84,7 @@ public class NotificationContext { /** * Get a map describing the update (keyed on {@link MapKey}) + * * @return the object map describing the update */ public Map<String, Object> getMap() { @@ -87,6 +93,7 @@ public class NotificationContext { /** * Get the type of the notification + * * @return the notification type */ public Type getType() { @@ -95,23 +102,35 @@ public class NotificationContext { /** * Set the HelixManager associated with this notification + * * @param manager {@link HelixManager} object */ public void setManager(HelixManager manager) { this._manager = manager; } + /** + * Gets creation time. + * + * @return the creation time + */ public long getCreationTime() { return _creationTime; } + /** + * Sets creation time. + * + * @param creationTime the creation time + */ public void setCreationTime(long creationTime) { _creationTime = creationTime; } /** * Add notification metadata - * @param key String value of a {@link MapKey} + * + * @param key String value of a {@link MapKey} * @param value */ public void add(String key, Object value) { @@ -120,6 +139,7 @@ public class NotificationContext { /** * Set the notification map + * * @param map */ public void setMap(Map<String, Object> map) { @@ -128,6 +148,7 @@ public class NotificationContext { /** * Set the notification type + * * @param {@link Type} object */ public void setType(Type type) { @@ -136,8 +157,8 @@ public class NotificationContext { /** * Get a notification attribute + * * @param key String from a {@link MapKey} - * @return */ public Object get(String key) { return _map.get(key); @@ -154,6 +175,7 @@ public class NotificationContext { /** * Get the path changed status + * * @return String corresponding to the path change */ public String getPathChanged() { @@ -162,9 +184,28 @@ public class NotificationContext { /** * Set the path changed status + * * @param pathChanged */ public void setPathChanged(String pathChanged) { this._pathChanged = pathChanged; } + + /** + * Gets the change type. + * + * @return the change type + */ + public HelixConstants.ChangeType getChangeType() { + return _changeType; + } + + /** + * Sets the change type. + * + * @param changeType the change type + */ + public void setChangeType(HelixConstants.ChangeType changeType) { + this._changeType = changeType; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index c63680f..41fae3e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -511,6 +511,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { try { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.INIT); + changeContext.setChangeType(_changeType); enqueueTask(changeContext); } catch (Exception e) { String msg = "Exception while invoking init callback for listener:" + _listener; @@ -528,6 +529,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); changeContext.setPathChanged(dataPath); + changeContext.setChangeType(_changeType); enqueueTask(changeContext); } } catch (Exception e) { @@ -579,6 +581,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); changeContext.setPathChanged(parentPath); + changeContext.setChangeType(_changeType); enqueueTask(changeContext); } } @@ -596,6 +599,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { try { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.FINALIZE); + changeContext.setChangeType(_changeType); enqueueTask(changeContext); } catch (Exception e) { String msg = "Exception while resetting the listener:" + _listener; http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index ad4a276..d650fbb 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -19,35 +19,12 @@ package org.apache.helix.messaging.handling; * under the License. */ -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.helix.ConfigAccessor; -import org.apache.helix.Criteria; -import org.apache.helix.HelixConstants; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; -import org.apache.helix.MessageListener; -import org.apache.helix.NotificationContext; +import org.apache.helix.*; import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.NotificationContext.Type; -import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.api.listeners.MessageListener; +import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.model.CurrentState; import org.apache.helix.model.HelixConfigScope; @@ -66,6 +43,9 @@ import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.util.StatusUpdateUtil; import org.apache.log4j.Logger; +import java.util.*; +import java.util.concurrent.*; + public class HelixTaskExecutor implements MessageListener, TaskExecutor { /** * Put together all registration information about a message handler factory @@ -125,6 +105,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { final ConcurrentHashMap<String, String> _messageTaskMap; + final Set<String> _knownMessageIds; + /* Resources whose configuration for dedicate thread pool has been checked.*/ final Set<String> _resourcesThreadpoolChecked; final Set<String> _transitionTypeThreadpoolChecked; @@ -140,11 +122,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) { _monitor = participantStatusMonitor; - _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>(); + _taskMap = new ConcurrentHashMap<>(); - _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>(); - _executorMap = new ConcurrentHashMap<String, ExecutorService>(); - _messageTaskMap = new ConcurrentHashMap<String, String>(); + _hdlrFtyRegistry = new ConcurrentHashMap<>(); + _executorMap = new ConcurrentHashMap<>(); + _messageTaskMap = new ConcurrentHashMap<>(); + _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); _batchMessageExecutorService = Executors.newCachedThreadPool(); _monitor.createExecutorMonitor("BatchMessageExecutor", _batchMessageExecutorService); @@ -500,9 +483,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { private void updateMessageState(List<Message> readMsgs, HelixDataAccessor accessor, String instanceName) { Builder keyBuilder = accessor.keyBuilder(); - List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>(); + List<PropertyKey> readMsgKeys = new ArrayList<>(); for (Message msg : readMsgs) { readMsgKeys.add(msg.getKey(keyBuilder, instanceName)); + _knownMessageIds.add(msg.getId()); } accessor.setChildren(readMsgKeys, readMsgs); } @@ -589,6 +573,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _messageTaskMap.clear(); + _knownMessageIds.clear(); + _lastSessionSyncTime = null; } @@ -611,7 +597,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { LOG.info("Skip init a new thread pool for type: " + msgType + ", already existing pool: " + prevPool + ", isShutdown: " + prevPool.isShutdown()); newPool.shutdown(); - newPool = null; } else { _monitor.createExecutorMonitor(msgType, newPool); } @@ -641,7 +626,47 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } } + private List<Message> readNewMessagesFromZK(HelixManager manager, String instanceName, + HelixConstants.ChangeType changeType) { + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + Set<String> messageIds = new HashSet<>(); + if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) { + messageIds.addAll(accessor.getChildNames(keyBuilder.messages(instanceName))); + } else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) { + messageIds.addAll(accessor.getChildNames(keyBuilder.controllerMessages())); + } else { + LOG.warn("Unexpected ChangeType for Message Change CallbackHandler: " + changeType); + return Collections.emptyList(); + } + + // In case the cache contains any deleted message Id, clean up + _knownMessageIds.retainAll(messageIds); + + messageIds.removeAll(_knownMessageIds); + List<PropertyKey> keys = new ArrayList<>(); + for (String messageId : messageIds) { + if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) { + keys.add(keyBuilder.message(instanceName, messageId)); + } else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) { + keys.add(keyBuilder.controllerMessage(messageId)); + } + } + + List<Message> newMessages = accessor.getProperty(keys); + // Message may be removed before get read, clean up null messages. + Iterator<Message> messageIterator = newMessages.iterator(); + while(messageIterator.hasNext()) { + if (messageIterator.next() == null) { + messageIterator.remove(); + } + } + return newMessages; + } + @Override + @PreFetch(enabled = false) public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) { @@ -664,6 +689,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // continue to process messages } + if (messages == null || messages.isEmpty()) { + // If no messages are given, check and read all new messages. + messages = readNewMessagesFromZK(manager, instanceName, changeContext.getChangeType()); + } + if (_isShuttingDown) { StringBuilder sb = new StringBuilder(); for (Message message : messages) { @@ -689,18 +719,18 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { Builder keyBuilder = accessor.keyBuilder(); // message handlers created - Map<String, MessageHandler> stateTransitionHandlers = new HashMap<String, MessageHandler>(); - List<MessageHandler> nonStateTransitionHandlers = new ArrayList<MessageHandler>(); + Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>(); + List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>(); // message read - List<Message> readMsgs = new ArrayList<Message>(); + List<Message> readMsgs = new ArrayList<>(); String sessionId = manager.getSessionId(); List<String> curResourceNames = accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); - List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>(); - List<CurrentState> metaCurStates = new ArrayList<CurrentState>(); - Set<String> createCurStateNames = new HashSet<String>(); + List<PropertyKey> createCurStateKeys = new ArrayList<>(); + List<CurrentState> metaCurStates = new ArrayList<>(); + Set<String> createCurStateNames = new HashSet<>(); for (Message message : messages) { // nop messages are simply removed. It is used to trigger onMessage() in @@ -708,7 +738,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) { LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc()); - accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + removeMessageFromZk(accessor, message, instanceName); continue; } @@ -721,7 +751,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId(); LOG.warn(warningMessage); - accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + removeMessageFromZk(accessor, message, instanceName); _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, accessor); // Proactively send a session sync message from participant to controller @@ -743,7 +773,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { PropertyKey key = new Builder(manager.getClusterName()).liveInstances(); List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key); _controller.onLiveInstanceChange(liveInstances, changeContext); - accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + removeMessageFromZk(accessor, message, instanceName); _monitor.reportReceivedMessage(message); _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED); continue; @@ -846,7 +876,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, accessor); message.setMsgState(MessageState.UNPROCESSABLE); - accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + removeMessageFromZk(accessor, message, instanceName); LOG.error("Message cannot be processed: " + message.getRecord(), e); _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED); continue; @@ -937,6 +967,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } private void removeMessageFromTaskAndFutureMap(Message message) { + _knownMessageIds.remove(message.getId()); String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName()); if (_messageTaskMap.containsKey(messageTarget)) { _messageTaskMap.remove(messageTarget); @@ -966,13 +997,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { private void removeMessageFromZk(HelixDataAccessor accessor, Message message, String instanceName) { - Builder keyBuilder = accessor.keyBuilder(); - if (message.getTgtName().equalsIgnoreCase("controller")) { - // TODO: removeProperty returns boolean - accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId())); - } else { - accessor.removeProperty(keyBuilder.message(instanceName, message.getMsgId())); - } + accessor.removeProperty(message.getKey(accessor.keyBuilder(), instanceName)); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/fbb19543/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index c53f09f..40370d5 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -20,17 +20,14 @@ package org.apache.helix.messaging.handling; */ import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.api.listeners.ClusterConfigChangeListener; -import org.apache.helix.api.listeners.ResourceConfigChangeListener; +import org.apache.helix.*; import org.apache.helix.mock.MockManager; -import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.testng.Assert; @@ -50,7 +47,7 @@ public class TestHelixTaskExecutor { class TestMessageHandlerFactory implements MessageHandlerFactory { int _handlersCreated = 0; - ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>(); + ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>(); class TestMessageHandler extends MessageHandler { public TestMessageHandler(Message message, NotificationContext context) { @@ -277,6 +274,7 @@ public class TestHelixTaskExecutor { msg.setCorrelationId(UUID.randomUUID().toString()); msgList.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(1000); @@ -328,6 +326,7 @@ public class TestHelixTaskExecutor { msg.setSrcName("127.101.1.23_2234"); msgList.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(1000); @@ -377,6 +376,7 @@ public class TestHelixTaskExecutor { msg.setTgtName(""); msgList.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(1000); @@ -424,6 +424,7 @@ public class TestHelixTaskExecutor { exceptionMsg.setCorrelationId(UUID.randomUUID().toString()); msgList.add(exceptionMsg); + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(1000); @@ -466,6 +467,7 @@ public class TestHelixTaskExecutor { msg.setSrcName("127.101.1.23_2234"); msgListToCancel.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(500); for (int i = 0; i < nMsgs2; i++) { @@ -535,6 +537,7 @@ public class TestHelixTaskExecutor { msgList.add(msg); } NotificationContext changeContext = new NotificationContext(manager); + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("some", msgList, changeContext); Thread.sleep(500); for (ExecutorService svc : executor._executorMap.values()) { @@ -572,6 +575,7 @@ public class TestHelixTaskExecutor { msg.setExecutionTimeout((i + 1) * 600); msgList.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(4000); @@ -618,6 +622,7 @@ public class TestHelixTaskExecutor { msg.setRetryCount(1); msgList.add(msg); } + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(3500); AssertJUnit.assertEquals(factory._processedMsgIds.size(), 3); @@ -663,10 +668,54 @@ public class TestHelixTaskExecutor { msg2.setToState("MASTER"); msgList.add(msg2); + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(3000); AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0); AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0); } + + @Test + public void testMessageReadOptimization() throws InterruptedException { + HelixTaskExecutor executor = new HelixTaskExecutor(); + HelixManager manager = new MockClusterManager(); + + TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } + + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + List<String> messageIds = new ArrayList<>(); + int nMsgs1 = 5; + for (int i = 0; i < nMsgs1; i++) { + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); + msg.setTgtSessionId(manager.getSessionId()); + msg.setTgtName("Localhost_1123"); + msg.setSrcName("127.101.1.23_2234"); + msg.setCorrelationId(UUID.randomUUID().toString()); + accessor.setProperty(keyBuilder.message("someInstance", msg.getId()), msg); + messageIds.add(msg.getId()); + } + + NotificationContext changeContext = new NotificationContext(manager); + changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); + + // Simulate read message already, then processing message. Should read and handle no message. + executor._knownMessageIds.addAll(messageIds); + executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext); + Thread.sleep(3000); + AssertJUnit.assertEquals(0, factory._processedMsgIds.size()); + executor._knownMessageIds.clear(); + + // Processing message normally + executor.onMessage("someInstance", Collections.EMPTY_LIST, changeContext); + Thread.sleep(3000); + AssertJUnit.assertEquals(nMsgs1, factory._processedMsgIds.size()); + // After all messages are processed, _knownMessageIds should be empty. + Assert.assertTrue(executor._knownMessageIds.isEmpty()); + } }