Repository: helix Updated Branches: refs/heads/master d602da9dd -> cdc3b8d60
CallbackHandler to use either java config property or class annotation to enable batch callback handling. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2dbd88ff Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2dbd88ff Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2dbd88ff Branch: refs/heads/master Commit: 2dbd88ffed2eb84697225a58e8831b740ecd5919 Parents: d602da9 Author: Lei Xia <[email protected]> Authored: Fri Mar 2 14:20:14 2018 -0800 Committer: Lei Xia <[email protected]> Committed: Fri Mar 2 14:51:44 2018 -0800 ---------------------------------------------------------------------- .../helix/manager/zk/CallbackHandler.java | 7 ++ .../helix/manager/zk/ZkCallbackCache.java | 2 +- .../helix/TestListenerCallbackBatchMode.java | 110 ++++++++++++++----- 3 files changed, 92 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 701cf9f..42adade 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -152,6 +153,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class); PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class); + String asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled"); + if (asyncBatchModeEnabled != null) { + _batchModeEnabled = Boolean.parseBoolean(asyncBatchModeEnabled); + logger.info("isAsyncBatchModeEnabled by default: " + _batchModeEnabled); + } + if (batchMode != null) { _batchModeEnabled = batchMode.enabled(); } http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java index 530d9d9..5b82242 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java @@ -57,7 +57,7 @@ public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener, IZ _accessor = accessor; _chrootPath = chrootPath; - _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>(); + _listener = new ConcurrentHashMap<>(); _eventThread = eventThread; // init cache http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java index 328098c..8cc3b0f 100644 --- a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java +++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java @@ -49,7 +49,8 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase { } } - @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { if (context.getType().equals(NotificationContext.Type.CALLBACK)) { _instanceConfigChangedCount++; @@ -80,6 +81,14 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase { } } + @BatchMode (enabled = false) + class BatchDisableddListener extends Listener { + @Override + public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext) { + super.onIdealStateChange(idealState, changeContext); + } + } + private HelixManager _manager; private int _numNode = 8; @@ -122,21 +131,45 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase { final Listener listener = new Listener(); addListeners(listener); + updateConfigs(); + verifyNonbatchedListeners(listener); + removeListeners(listener); + System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis())); + } + + @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"}) + public void testEnableBatchedListenerByJavaProperty() throws Exception { + String methodName = TestHelper.getTestMethodName(); + System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis())); + + System.setProperty("isAsyncBatchModeEnabled", "true"); + + final Listener listener = new Listener(); + addListeners(listener); updateConfigs(); + verifyBatchedListeners(listener); - Boolean result = TestHelper.verify(new TestHelper.Verifier() { - @Override public boolean verify() { - return (listener._instanceConfigChangedCount == _numNode) && ( - listener._idealStateChangedCount == _numResource); - } - }, 12000); + System.setProperty("isAsyncBatchModeEnabled", "false"); + removeListeners(listener); - Thread.sleep(50); + System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis())); + } - Assert.assertTrue(result, - "non batched: instance: " + listener._instanceConfigChangedCount + ", idealstate: " - + listener._idealStateChangedCount + "\nbatched: instance: "); + @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"}) + public void testDisableBatchedListenerByAnnotation() throws Exception { + String methodName = TestHelper.getTestMethodName(); + System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis())); + + System.setProperty("isAsyncBatchModeEnabled", "true"); + + final Listener listener = new BatchDisableddListener(); + addListeners(listener); + updateConfigs(); + verifyNonbatchedListeners(listener); + + System.setProperty("isAsyncBatchModeEnabled", "false"); + removeListeners(listener); System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis())); } @@ -148,17 +181,9 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase { final BatchedListener batchListener = new BatchedListener(); addListeners(batchListener); - updateConfigs(); - - Thread.sleep(4000); - - boolean result = (batchListener._instanceConfigChangedCount < _numNode/2) && ( - batchListener._idealStateChangedCount < _numResource/2); - - Assert.assertTrue(result, - "batched: instance: " + batchListener._instanceConfigChangedCount + ", idealstate: " - + batchListener._idealStateChangedCount); + verifyBatchedListeners(batchListener); + removeListeners(batchListener); System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis())); } @@ -170,26 +195,59 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase { final MixedListener mixedListener = new MixedListener(); addListeners(mixedListener); - updateConfigs(); Thread.sleep(4000); - boolean result = (mixedListener._instanceConfigChangedCount == _numNode) && ( mixedListener._idealStateChangedCount < _numResource/2); - Assert.assertTrue(result, - "Mixed: instance: " + mixedListener._instanceConfigChangedCount + ", idealstate: " - + mixedListener._idealStateChangedCount); + Assert.assertTrue(result, "instance callbacks: " + mixedListener._instanceConfigChangedCount + + ", idealstate callbacks " + mixedListener._idealStateChangedCount + "\ninstance count: " + + _numNode + ", idealstate counts: " + _numResource); + + removeListeners(mixedListener); System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis())); } + private void verifyNonbatchedListeners(final Listener listener) throws Exception { + Boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() { + return (listener._instanceConfigChangedCount == _numNode) && ( + listener._idealStateChangedCount == _numResource); + } + }, 12000); + + Thread.sleep(50); + Assert.assertTrue(result, + "instance callbacks: " + listener._instanceConfigChangedCount + ", idealstate callbacks " + + listener._idealStateChangedCount + "\ninstance count: " + _numNode + + ", idealstate counts: " + _numResource); + } + + private void verifyBatchedListeners(Listener batchListener) throws InterruptedException { + Thread.sleep(3000); + boolean result = (batchListener._instanceConfigChangedCount < _numNode / 2) && ( + batchListener._idealStateChangedCount < _numResource / 2); + + Assert.assertTrue(result, "instance callbacks: " + batchListener._instanceConfigChangedCount + + ", idealstate callbacks " + batchListener._idealStateChangedCount + "\ninstance count: " + + _numNode + ", idealstate counts: " + _numResource); + + } + private void addListeners(Listener listener) throws Exception { _manager.addInstanceConfigChangeListener(listener); _manager.addIdealStateChangeListener(listener); } + private void removeListeners(Listener listener) throws Exception { + _manager.removeListener(new PropertyKey.Builder(_manager.getClusterName()).instanceConfigs(), + listener); + _manager + .removeListener(new PropertyKey.Builder(_manager.getClusterName()).idealStates(), listener); + } + private void updateConfigs() throws InterruptedException { final Random r = new Random(System.currentTimeMillis()); // test change content
