Repository: helix Updated Branches: refs/heads/helix-0.6.x 015a73cef -> 384978a2e
Adding support to batch ZK callback optionally by setting sys var asyncBatchModeEnabled=true Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/384978a2 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/384978a2 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/384978a2 Branch: refs/heads/helix-0.6.x Commit: 384978a2e16ab0f4adb388e32c7e448c77996ca2 Parents: 015a73c Author: kishoreg <[email protected]> Authored: Fri Mar 24 10:48:05 2017 -0700 Committer: kishoreg <[email protected]> Committed: Fri Mar 24 10:48:05 2017 -0700 ---------------------------------------------------------------------- .../helix/manager/zk/CallbackHandler.java | 130 ++++++++++++------- 1 file changed, 86 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/384978a2/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index c3e8206..90df56d 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -31,6 +31,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.I0Itec.zkclient.IZkChildListener; @@ -53,7 +55,7 @@ import org.apache.helix.MessageListener; import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.PropertyPathConfig; import org.apache.helix.ScopedConfigChangeListener; import org.apache.helix.ZNRecord; import org.apache.helix.model.CurrentState; @@ -89,15 +91,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener private final AtomicLong _lastNotificationTimeStamp; private final HelixManager _manager; private final PropertyKey _propertyKey; - + BlockingQueue<NotificationContext> _queue = new LinkedBlockingQueue<NotificationContext>(1000); + private static boolean asyncBatchModeEnabled = false; + static { + asyncBatchModeEnabled = Boolean.parseBoolean(System.getProperty("isAsyncBatchModeEnabled")); + logger.info("isAsyncBatchModeEnabled: " + asyncBatchModeEnabled); + } /** * maintain the expected notification types * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks */ private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE); - public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey, - Object listener, EventType[] eventTypes, ChangeType changeType) { + public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey, Object listener, EventType[] eventTypes, ChangeType changeType) { if (listener == null) { throw new HelixException("listener could not be null"); } @@ -111,6 +117,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener this._eventTypes = eventTypes; this._changeType = changeType; this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime()); + this._queue = new LinkedBlockingQueue<NotificationContext>(1000); + if (asyncBatchModeEnabled) { + new Thread(new CallbackInvoker(this)).start(); + } init(); } @@ -122,13 +132,58 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener return _path; } + class CallbackInvoker implements Runnable { + private CallbackHandler handler; + + CallbackInvoker(CallbackHandler handler) { + this.handler = handler; + } + + public void run() { + while (true) { + try { + NotificationContext notificationToProcess = _queue.take(); + int mergedCallbacks = 0; + // remove all elements in the queue that have the same type + while (true) { + NotificationContext nextItem = _queue.peek(); + if (nextItem != null && notificationToProcess.getType() == nextItem.getType()) { + notificationToProcess = _queue.take(); + mergedCallbacks++; + } else { + break; + } + } + try { + logger.info("Num callbacks merged for path:" + handler.getPath() + " : " + mergedCallbacks); + handler.invoke(notificationToProcess); + } catch (Exception e) { + logger.warn("Exception in callback processing thread. Skipping callback", e); + } + } catch (InterruptedException e) { + logger.warn("Interrupted exception in callback processing thread. Exiting thread, new callbacks will not be processed", e); + break; + } + } + } + } + + public void enqueueTask(NotificationContext changeContext) throws Exception { + //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately. + if (asyncBatchModeEnabled && changeContext.getType() != NotificationContext.Type.CALLBACK) { + logger.info("Enqueuing callback"); + _queue.put(changeContext); + } else { + invoke(changeContext); + } + } + public void invoke(NotificationContext changeContext) throws Exception { // This allows the listener to work with one change at a time synchronized (_manager) { Type type = changeContext.getType(); if (!_expectTypes.contains(type)) { - logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path - + ", expected types: " + _expectTypes + " but was " + type); + logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path + ", expected types: " + _expectTypes + " but was " + type); return; } _expectTypes = nextNotificationType.get(type); @@ -136,8 +191,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener // Builder keyBuilder = _accessor.keyBuilder(); long start = System.currentTimeMillis(); if (logger.isInfoEnabled()) { - logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" - + _listener.getClass().getCanonicalName()); + logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName()); } if (_changeType == IDEAL_STATE) { @@ -165,18 +219,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener List<HelixProperty> configs = _accessor.getChildValues(_propertyKey); listener.onConfigChange(configs, changeContext); } else if (_changeType == LIVE_INSTANCE) { - LiveInstanceChangeListener liveInstanceChangeListener = - (LiveInstanceChangeListener) _listener; + LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener; subscribeForChanges(changeContext, _path, true, true); List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey); liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext); } else if (_changeType == CURRENT_STATE) { - CurrentStateChangeListener currentStateChangeListener = - (CurrentStateChangeListener) _listener; + CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener; subscribeForChanges(changeContext, _path, true, true); - String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path); + String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path); List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey); @@ -185,7 +237,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener } else if (_changeType == MESSAGE) { MessageListener messageListener = (MessageListener) _listener; subscribeForChanges(changeContext, _path, true, false); - String instanceName = PropertyPathBuilder.getInstanceNameFromPath(_path); + String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path); List<Message> messages = _accessor.getChildValues(_propertyKey); messageListener.onMessage(instanceName, messages, changeContext); @@ -211,8 +263,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener long end = System.currentTimeMillis(); if (logger.isInfoEnabled()) { - logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" - + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms"); + logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + + "ms"); } } } @@ -220,12 +272,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener private void subscribeChildChange(String path, NotificationContext context) { NotificationContext.Type type = context.getType(); if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) { - logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path - + ", listener: " + _listener); + logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener); _zkClient.subscribeChildChanges(path, this); } else if (type == NotificationContext.Type.FINALIZE) { - logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path - + ", listener: " + _listener); + logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener); _zkClient.unsubscribeChildChanges(path, this); } @@ -235,22 +285,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener NotificationContext.Type type = context.getType(); if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) { if (logger.isDebugEnabled()) { - logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path - + ", listener: " + _listener); + logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: " + _listener); } _zkClient.subscribeDataChanges(path, this); } else if (type == NotificationContext.Type.FINALIZE) { - logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path - + ", listener: " + _listener); + logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: " + _listener); _zkClient.unsubscribeDataChanges(path, this); } } // TODO watchParent is always true. consider remove it - private void subscribeForChanges(NotificationContext context, String path, boolean watchParent, - boolean watchChild) { + private void subscribeForChanges(NotificationContext context, String path, boolean watchParent, boolean watchChild) { + long start = System.currentTimeMillis(); if (watchParent) { subscribeChildChange(path, context); } @@ -301,10 +349,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener } } } catch (ZkNoNodeException e) { - logger.warn("fail to subscribe child/data change. path: " + path + ", listener: " - + _listener, e); + logger.warn("fail to subscribe child/data change. path: " + path + ", listener: " + _listener, e); } } + long end = System.currentTimeMillis(); + logger.info("Subcribing to path:" + path + " took:" + (end - start)); } @@ -321,7 +370,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener try { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.INIT); - invoke(changeContext); + enqueueTask(changeContext); } catch (Exception e) { String msg = "Exception while invoking init callback for listener:" + _listener; ZKExceptionHandler.getInstance().handle(msg, e); @@ -335,11 +384,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener if (dataPath != null && dataPath.startsWith(_path)) { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); - invoke(changeContext); + enqueueTask(changeContext); } } catch (Exception e) { - String msg = - "exception in handling data-change. path: " + dataPath + ", listener: " + _listener; + String msg = "exception in handling data-change. path: " + dataPath + ", listener: " + _listener; ZKExceptionHandler.getInstance().handle(msg, e); } } @@ -349,14 +397,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener try { updateNotificationTime(System.nanoTime()); if (dataPath != null && dataPath.startsWith(_path)) { - logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath - + ", listener: " + _listener); + logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath + ", listener: " + _listener); _zkClient.unsubscribeDataChanges(dataPath, this); // only needed for bucketized parent, but OK if we don't have child-change // watch on the bucketized parent path - logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath - + ", listener: " + _listener); + logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath + ", listener: " + _listener); _zkClient.unsubscribeChildChanges(dataPath, this); // No need to invoke() since this event will handled by child-change on parent-node // NotificationContext changeContext = new NotificationContext(_manager); @@ -364,9 +410,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener // invoke(changeContext); } } catch (Exception e) { - String msg = - "exception in handling data-delete-change. path: " + dataPath + ", listener: " - + _listener; + String msg = "exception in handling data-delete-change. path: " + dataPath + ", listener: " + _listener; ZKExceptionHandler.getInstance().handle(msg, e); } } @@ -384,13 +428,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener _manager.removeListener(_propertyKey, _listener); } else { changeContext.setType(NotificationContext.Type.CALLBACK); - invoke(changeContext); + enqueueTask(changeContext); } } } catch (Exception e) { - String msg = - "exception in handling child-change. instance: " + _manager.getInstanceName() - + ", parentPath: " + parentPath + ", listener: " + _listener; + String msg = "exception in handling child-change. instance: " + _manager.getInstanceName() + ", parentPath: " + parentPath + ", listener: " + _listener; ZKExceptionHandler.getInstance().handle(msg, e); } } @@ -402,7 +444,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener try { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.FINALIZE); - invoke(changeContext); + enqueueTask(changeContext); } catch (Exception e) { String msg = "Exception while resetting the listener:" + _listener; ZKExceptionHandler.getInstance().handle(msg, e);
