Repository: helix
Updated Branches:
  refs/heads/master baf383b37 -> ae8eb5969


[HELIX-695] add helix manager listener for new connection notification


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ae8eb596
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ae8eb596
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ae8eb596

Branch: refs/heads/master
Commit: ae8eb5969e8bf5cc72704e0ef316cd4259f3b461
Parents: baf383b
Author: Harry Zhang <[email protected]>
Authored: Mon Apr 16 10:05:30 2018 -0700
Committer: Harry Zhang <[email protected]>
Committed: Thu Apr 19 11:01:53 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |  3 ++
 .../helix/integration/TestZkReconnect.java      | 30 +++++++++++++-------
 2 files changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ae8eb596/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d156c88..63a60c6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -661,6 +661,9 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     try {
       createClient();
       _messagingService.onConnected();
+      if (_stateListener != null) {
+        _stateListener.onConnected(this);
+      }
     } catch (Exception e) {
       LOG.error("fail to connect " + _instanceName, e);
       disconnect();

http://git-wip-us.apache.org/repos/asf/helix/blob/ae8eb596/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index 85ecb0c..aa23257 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -142,7 +142,7 @@ public class TestZkReconnect {
   }
 
   @Test
-  public void testZKDisconnectCallback() throws Exception {
+  public void testHelixManagerStateListenerCallback() throws Exception {
     final int zkPort = TestHelper.getRandomPort();
     final String zkAddr = String.format("localhost:%d", zkPort);
     final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
@@ -151,8 +151,9 @@ public class TestZkReconnect {
     String methodName = TestHelper.getTestMethodName();
     final String clusterName = className + "_" + methodName;
 
-    // Init flag to check if callback is triggered
-    final AtomicReference<Boolean> flag = new AtomicReference<Boolean>(false);
+    // Init onDisconnectedFlag to check if callback is triggered
+    final AtomicReference<Boolean> onDisconnectedFlag = new 
AtomicReference<>(false);
+    final AtomicReference<Boolean> onConnectedFlag = new 
AtomicReference<>(false);
 
     // Setup cluster
     LOG.info("Setup clusters");
@@ -169,26 +170,32 @@ public class TestZkReconnect {
             new HelixManagerStateListener() {
               @Override
               public void onConnected(HelixManager helixManager) throws 
Exception {
-                return;
+                Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
+                onConnectedFlag.getAndSet(true);
               }
 
               @Override
               public void onDisconnected(HelixManager helixManager, Throwable 
error) throws Exception {
                 Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
-                flag.getAndSet(true);
+                onDisconnectedFlag.getAndSet(true);
               }
             });
 
     try {
       controller.connect();
+      Assert.assertTrue(onConnectedFlag.getAndSet(false));
       ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
 
       // 1. shutdown zkServer and check if handler trigger callback
       zkServer.shutdown();
-      // Retry will fail, and flag should be set within onDisconnected handler
+
+      // Retry will fail, and onDisconnectedFlag should be set within 
onDisconnected handler
       controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertTrue(flag.get());
+      Assert.assertTrue(onDisconnectedFlag.get());
+      Assert.assertFalse(onConnectedFlag.get());
+      Assert.assertFalse(controller.isConnected());
 
+      // Verify ZK is down
       try {
         propertyStore.get("/", null, 0);
         Assert.fail("propertyStore should be disconnected.");
@@ -198,11 +205,14 @@ public class TestZkReconnect {
       }
 
       // 2. restart zkServer and check if handler will recover connection
-      flag.getAndSet(false);
+      onDisconnectedFlag.getAndSet(false);
       zkServer.start();
-      // Retry will succeed, and flag should not be set
+
+      // Retry will succeed, and onDisconnectedFlag should not be set
       controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertFalse(flag.get());
+      Assert.assertFalse(onDisconnectedFlag.get());
+      Assert.assertTrue(onConnectedFlag.get());
+
       // New propertyStore should be in good state
       propertyStore = controller.getHelixPropertyStore();
       propertyStore.get("/", null, 0);

Reply via email to