Repository: helix Updated Branches: refs/heads/helix-0.6.x fab5423f1 -> 70a585aca
Creating a separate threadpool to handle batchMessages Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/70a585ac Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/70a585ac Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/70a585ac Branch: refs/heads/helix-0.6.x Commit: 70a585aca1302aff767a91c59040ad9c94439323 Parents: fab5423 Author: kishoreg <[email protected]> Authored: Mon Apr 3 00:10:20 2017 -0700 Committer: kishoreg <[email protected]> Committed: Mon Apr 3 00:10:20 2017 -0700 ---------------------------------------------------------------------- .../messaging/handling/HelixTaskExecutor.java | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/70a585ac/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index d68b272..8d3fea1 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -119,6 +119,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry; final ConcurrentHashMap<String, ExecutorService> _executorMap; + + /** + * separate executor for executing batch messages + */ + private final ExecutorService _batchMessageExecutorService; + /* Resources whose configuration for dedicate thread pool has been checked.*/ final Set<String> _resourcesThreadpoolChecked; @@ -126,6 +132,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // timer for schedule timeout tasks final Timer _timer; + public HelixTaskExecutor() { this(new ParticipantStatusMonitor(false, null)); } @@ -135,6 +142,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>(); _executorMap = new ConcurrentHashMap<String, ExecutorService>(); + _batchMessageExecutorService = Executors.newCachedThreadPool(); _resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); @@ -261,12 +269,16 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { ExecutorService findExecutorServiceForMsg(Message message) { ExecutorService executorService = _executorMap.get(message.getMsgType()); if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) { - String resourceName = message.getResourceName(); - if (resourceName != null) { - String key = message.getMsgType() + "." + resourceName; - if (_executorMap.containsKey(key)) { - LOG.info("Find per-resource thread pool with key: " + key); - executorService = _executorMap.get(key); + if(message.getBatchMessageMode() == true) { + executorService = _batchMessageExecutorService; + } else { + String resourceName = message.getResourceName(); + if (resourceName != null) { + String key = message.getMsgType() + "." + resourceName; + if (_executorMap.containsKey(key)) { + LOG.info("Find per-resource thread pool with key: " + key); + executorService = _executorMap.get(key); + } } } }
