Repository: helix Updated Branches: refs/heads/master f99d9477f -> 82cfd15b5
Fix race-condition issue that could block ZkClient event thread in CallbackHandler. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/82cfd15b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/82cfd15b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/82cfd15b Branch: refs/heads/master Commit: 82cfd15b5b324674f4f5c3a50e156378460228c5 Parents: f99d947 Author: Lei Xia <[email protected]> Authored: Mon Apr 16 11:38:26 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Thu Apr 19 11:36:17 2018 -0700 ---------------------------------------------------------------------- .../helix/common/DedupEventProcessor.java | 4 +- .../helix/manager/zk/CallbackHandler.java | 205 ++++++++++--------- .../manager/zk/zookeeper/ZkEventThread.java | 2 + 3 files changed, 109 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java index b656364..7f3525b 100644 --- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java +++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java @@ -52,7 +52,7 @@ public abstract class DedupEventProcessor<T, E> extends Thread { logger.error(_processorName + " thread failed while running the controller pipeline", t); } } - logger.info("END " + _processorName + " thread"); + logger.info("END " + _processorName + " thread for cluster " + _clusterName); } protected abstract void handleEvent(E event); @@ -62,7 +62,7 @@ public abstract class DedupEventProcessor<T, E> extends Thread { } public void shutdown() { - _eventQueue.clear(); this.interrupt(); + _eventQueue.clear(); } } http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 22a1a46..5890fb8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -27,8 +27,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; @@ -87,6 +85,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT)); } + // processor to handle async zk event resubscription. + private static DedupEventProcessor SubscribeChangeEventProcessor; + private final String _path; private final Object _listener; private final Set<EventType> _eventTypes; @@ -96,24 +97,36 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private final AtomicLong _lastNotificationTimeStamp; private final HelixManager _manager; private final PropertyKey _propertyKey; - BlockingQueue<NotificationContext> _queue = new LinkedBlockingQueue<>(1000); private boolean _batchModeEnabled = false; private boolean _preFetchEnabled = true; private HelixCallbackMonitor _monitor; - private Thread _batchProcessThread; // TODO: change this to use DedupEventProcessor - Lei. + + // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei + private CallbackProcessor _batchCallbackProcessor; private boolean _watchChild = true; // Whether we should subscribe to the child znode's data change. - private static DedupEventProcessor SubscribeChangeEventProcessor; + + // indicated whether this CallbackHandler is ready to serve event callback from ZkClient. + private boolean _ready = false; static { SubscribeChangeEventProcessor = - new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("", + new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("Singleton", "CallbackHanlder-AsycSubscribe") { - @Override protected void handleEvent(SubscribeChangeEvent event) { - logger.info("Resubscribe change to " + event.path + " for listener " + event.listener); + @Override + protected void handleEvent(SubscribeChangeEvent event) { + logger.info("Resubscribe change listener to path: " + event.path + ", for listener: " + + event.listener + ", watchChild: " + event.watchChild); try { - event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild); + if (event.handler.isReady()) { + event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild); + } else { + logger.info( + "CallbackHandler is not ready, stop subscribing changes listener to path: " + + event.path + ", for listener: " + event.listener + ", watchChild: " + + event.watchChild); + } } catch (Exception e) { - logger.error("Failed to resubscribe change to " + event.path + " for listener " + logger.error("Failed to resubscribe change to path: " + event.path + " for listener " + event.listener, e); } } @@ -139,6 +152,25 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } } + class CallbackProcessor + extends DedupEventProcessor<NotificationContext.Type, NotificationContext> { + private CallbackHandler _handler; + + public CallbackProcessor(CallbackHandler handler) { + super(_manager.getClusterName(), "CallbackProcessor"); + _handler = handler; + } + + @Override + protected void handleEvent(NotificationContext event) { + try { + _handler.invoke(event); + } catch (Exception e) { + logger.warn("Exception in callback processing thread. Skipping callback", e); + } + } + } + /** * maintain the expected notification types * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks @@ -172,7 +204,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { _eventTypes = new HashSet<>(Arrays.asList(eventTypes)); _changeType = changeType; _lastNotificationTimeStamp = new AtomicLong(System.nanoTime()); - _queue = new LinkedBlockingQueue<>(1000); _monitor = monitor; if (_changeType == MESSAGE || _changeType == MESSAGES_CONTROLLER || _changeType == CONTROLLER) { @@ -183,9 +214,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { parseListenerProperties(); - logger.info("isAsyncBatchModeEnabled: " + _batchModeEnabled); - logger.info("isPreFetchEnabled: " + _preFetchEnabled); - init(); } @@ -194,9 +222,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class); PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class); - String asyncBatchModeEnabled = System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled"); + String asyncBatchModeEnabled = + System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled"); if (asyncBatchModeEnabled == null) { - // for backcompatible, the old property name is deprecated. + // for back-compatible, the old property name is deprecated. asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled"); } @@ -279,62 +308,22 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { return _path; } - class CallbackProcessor extends Thread { - private CallbackHandler _handler; - - CallbackProcessor(CallbackHandler handler) { - super("CallbackHandler-batchprocess"); - _handler = handler; - } - - public void run() { - logger.info( - "start batch callback handle thread for path:" + _handler.getPath() + ", listener: " - + _listener); - while (!Thread.interrupted()) { - try { - NotificationContext notificationToProcess = _queue.take(); - int mergedCallbacks = 0; - // remove all elements in the queue that have the same type - while (true) { - NotificationContext nextItem = _queue.peek(); - if (nextItem != null && notificationToProcess.getType() == nextItem.getType()) { - notificationToProcess = _queue.take(); - mergedCallbacks++; - } else { - break; - } - } - try { - logger.info( - "Num callbacks merged for path:" + _handler.getPath() + " : " + mergedCallbacks); - _handler.invoke(notificationToProcess); - } catch (Exception e) { - logger.warn("Exception in callback processing thread. Skipping callback", e); - } - } catch (InterruptedException e) { - logger.warn( - "Interrupted exception in callback processing thread. Exiting thread for listener " - + _listener + ", new callbacks will not be processed", e); - break; - } - } - - logger.warn( - "Exiting batch callback processing thread for listener : " + _listener + ", path: " - + _path); - } - } - public void enqueueTask(NotificationContext changeContext) throws Exception { //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately. if (_batchModeEnabled && changeContext.getType() == NotificationContext.Type.CALLBACK) { logger.debug("Enqueuing callback"); - _queue.put(changeContext); + if (!isReady()) { + logger.info( + "CallbackHandler is not ready, ignore change callback from path: " + + _path + ", for listener: " + _listener); + } else { + _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext); + } } else { invoke(changeContext); } + if (_monitor != null) { _monitor.increaseCallbackUnbatchedCounters(); } @@ -367,7 +356,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { // put SubscribeForChange run in async thread to reduce the latency of zk callback handling. subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild); } - _expectTypes = nextNotificationType.get(type); if (_changeType == IDEAL_STATE) { IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener; @@ -464,11 +452,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } _zkClient.subscribeChildChanges(path, this); } else if (callbackType == NotificationContext.Type.FINALIZE) { - if (logger.isDebugEnabled()) { - logger.debug( - _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " - + _listener); - } + logger.info( + _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener); + _zkClient.unsubscribeChildChanges(path, this); } } @@ -483,11 +469,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } _zkClient.subscribeDataChanges(path, this); } else if (callbackType == NotificationContext.Type.FINALIZE) { - if (logger.isDebugEnabled()) { - logger.debug( - _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " - + _listener); - } + logger.info( + _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " + + _listener); + _zkClient.unsubscribeDataChanges(path, this); } } @@ -501,21 +486,23 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private void subscribeForChanges(NotificationContext.Type callbackType, String path, boolean watchChild) { + logger.info( + "Subscribing changes listener to path: " + path + ", type: " + callbackType + ", listener: " + + _listener); + long start = System.currentTimeMillis(); if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted)) { - logger.info("Subscribing data change listener to path:" + path + " for listener: " + _listener); + logger.info("Subscribing data change listener to path: " + path); subscribeDataChange(path, callbackType); } if (_eventTypes.contains(EventType.NodeChildrenChanged)) { - logger.info( - "Subscribing child change listener to path:" + path + " for listener: " + _listener); + logger.info("Subscribing child change listener to path:" + path); subscribeChildChange(path, callbackType); if (watchChild) { - logger.info( - "Subscribing data change listener to all children for path:" + path + " for listener: " - + _listener); + logger.info("Subscribing data change listener to all children for path:" + path); + try { switch (_changeType) { case CURRENT_STATE: @@ -581,9 +568,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { * exists */ public void init() { + logger.info("initializing CallbackHandler: " + this.toString() + " content: " + getContent()); + if (_batchModeEnabled) { - _batchProcessThread = new CallbackProcessor(this); - _batchProcessThread.start(); + if (_batchCallbackProcessor != null) { + _batchCallbackProcessor.shutdown(); + } + _batchCallbackProcessor = new CallbackProcessor(this); + _batchCallbackProcessor.start(); } updateNotificationTime(System.nanoTime()); @@ -591,7 +583,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.INIT); changeContext.setChangeType(_changeType); - enqueueTask(changeContext); + _ready = true; + invoke(changeContext); } catch (Exception e) { String msg = "Exception while invoking init callback for listener:" + _listener; ZKExceptionHandler.getInstance().handle(msg, e); @@ -628,18 +621,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { try { updateNotificationTime(System.nanoTime()); if (dataPath != null && dataPath.startsWith(_path)) { - if (logger.isDebugEnabled()) { - logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath - + ", listener: " + _listener); - } + logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath + + ", listener: " + _listener); _zkClient.unsubscribeDataChanges(dataPath, this); // only needed for bucketized parent, but OK if we don't have child-change // watch on the bucketized parent path - if (logger.isDebugEnabled()) { - logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath - + ", listener: " + _listener); - } + logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath + + ", listener: " + _listener); _zkClient.unsubscribeChildChanges(dataPath, this); // No need to invoke() since this event will handled by child-change on parent-node } @@ -654,10 +643,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { public void handleChildChange(String parentPath, List<String> currentChilds) { if (logger.isDebugEnabled()) { logger.debug( - "Data change callback: child changed, path: " + parentPath + ", current child count: " + ( - currentChilds != null - ? currentChilds.size() - : 0)); + "Data change callback: child changed, path: " + parentPath + ", current child count: " + + currentChilds.size()); } try { @@ -687,14 +674,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { * Invoke the listener for the last time so that the listener could clean up resources */ public void reset() { + logger.info("Resetting CallbackHandler: " + this.toString()); try { + _ready = false; + if (_batchCallbackProcessor != null) { + _batchCallbackProcessor.shutdown(); + } NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.FINALIZE); changeContext.setChangeType(_changeType); - enqueueTask(changeContext); - if (_batchProcessThread != null) { - _batchProcessThread.interrupt(); - } + invoke(changeContext); } catch (Exception e) { String msg = "Exception while resetting the listener:" + _listener; ZKExceptionHandler.getInstance().handle(msg, e); @@ -713,4 +702,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { } } + public boolean isReady() { + return _ready; + } + + public String getContent() { + return "CallbackHandler{" + + "_watchChild=" + _watchChild + + ", _preFetchEnabled=" + _preFetchEnabled + + ", _batchModeEnabled=" + _batchModeEnabled + + ", _path='" + _path + '\'' + + ", _listener=" + _listener + + ", _changeType=" + _changeType + + ", _manager=" + _manager + + ", _zkClient=" + _zkClient + + '}'; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java index a16f27c..8572191 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java @@ -82,6 +82,8 @@ public class ZkEventThread extends Thread { } catch (InterruptedException e) { LOG.info("Terminate ZkClient event thread."); } + + LOG.info("Terminate ZkClient event thread."); } public void send(ZkEvent event) {
