Repository: helix Updated Branches: refs/heads/master baf383b37 -> ae8eb5969
[HELIX-695] add helix manager listener for new connection notification Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ae8eb596 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ae8eb596 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ae8eb596 Branch: refs/heads/master Commit: ae8eb5969e8bf5cc72704e0ef316cd4259f3b461 Parents: baf383b Author: Harry Zhang <[email protected]> Authored: Mon Apr 16 10:05:30 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Apr 19 11:01:53 2018 -0700 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 3 ++ .../helix/integration/TestZkReconnect.java | 30 +++++++++++++------- 2 files changed, 23 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ae8eb596/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index d156c88..63a60c6 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -661,6 +661,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { try { createClient(); _messagingService.onConnected(); + if (_stateListener != null) { + _stateListener.onConnected(this); + } } catch (Exception e) { LOG.error("fail to connect " + _instanceName, e); disconnect(); http://git-wip-us.apache.org/repos/asf/helix/blob/ae8eb596/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java index 85ecb0c..aa23257 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java @@ -142,7 +142,7 @@ public class TestZkReconnect { } @Test - public void testZKDisconnectCallback() throws Exception { + public void testHelixManagerStateListenerCallback() throws Exception { final int zkPort = TestHelper.getRandomPort(); final String zkAddr = String.format("localhost:%d", zkPort); final ZkServer zkServer = TestHelper.startZkServer(zkAddr); @@ -151,8 +151,9 @@ public class TestZkReconnect { String methodName = TestHelper.getTestMethodName(); final String clusterName = className + "_" + methodName; - // Init flag to check if callback is triggered - final AtomicReference<Boolean> flag = new AtomicReference<Boolean>(false); + // Init onDisconnectedFlag to check if callback is triggered + final AtomicReference<Boolean> onDisconnectedFlag = new AtomicReference<>(false); + final AtomicReference<Boolean> onConnectedFlag = new AtomicReference<>(false); // Setup cluster LOG.info("Setup clusters"); @@ -169,26 +170,32 @@ public class TestZkReconnect { new HelixManagerStateListener() { @Override public void onConnected(HelixManager helixManager) throws Exception { - return; + Assert.assertEquals(helixManager.getClusterName(), clusterName); + onConnectedFlag.getAndSet(true); } @Override public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception { Assert.assertEquals(helixManager.getClusterName(), clusterName); - flag.getAndSet(true); + onDisconnectedFlag.getAndSet(true); } }); try { controller.connect(); + Assert.assertTrue(onConnectedFlag.getAndSet(false)); ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore(); // 1. shutdown zkServer and check if handler trigger callback zkServer.shutdown(); - // Retry will fail, and flag should be set within onDisconnected handler + + // Retry will fail, and onDisconnectedFlag should be set within onDisconnected handler controller.handleSessionEstablishmentError(new Exception("For testing")); - Assert.assertTrue(flag.get()); + Assert.assertTrue(onDisconnectedFlag.get()); + Assert.assertFalse(onConnectedFlag.get()); + Assert.assertFalse(controller.isConnected()); + // Verify ZK is down try { propertyStore.get("/", null, 0); Assert.fail("propertyStore should be disconnected."); @@ -198,11 +205,14 @@ public class TestZkReconnect { } // 2. restart zkServer and check if handler will recover connection - flag.getAndSet(false); + onDisconnectedFlag.getAndSet(false); zkServer.start(); - // Retry will succeed, and flag should not be set + + // Retry will succeed, and onDisconnectedFlag should not be set controller.handleSessionEstablishmentError(new Exception("For testing")); - Assert.assertFalse(flag.get()); + Assert.assertFalse(onDisconnectedFlag.get()); + Assert.assertTrue(onConnectedFlag.get()); + // New propertyStore should be in good state propertyStore = controller.getHelixPropertyStore(); propertyStore.get("/", null, 0);
