Updated Branches:
  refs/heads/master cec747d68 -> d26addcf1

add test cases for zk callback handler leaking


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d26addcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d26addcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d26addcf

Branch: refs/heads/master
Commit: d26addcf159f484391d26eddaa7b695168ec8227
Parents: cec747d
Author: zzhang <[email protected]>
Authored: Thu May 2 12:53:39 2013 -0700
Committer: zzhang <[email protected]>
Committed: Thu May 2 12:53:39 2013 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java    |    2 +-
 .../test/java/org/apache/helix/TestZkBasis.java    |  161 ++++++++++++++
 .../test/java/org/apache/helix/ZkTestHelper.java   |   51 ++++-
 .../integration/TestZkCallbackHandlerLeak.java     |  165 ++++++++++++++-
 4 files changed, 368 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4095796..de93035 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -259,7 +259,7 @@ public class ZKHelixManager implements HelixManager
         // compare property-key path and listener reference
         if (handler.getPath().equals(propertyKey.getPath()) && 
handler.getListener().equals(listener))
         {
-          // TODO add log
+          logger.info("Listener: " + listener + " on path: " + 
propertyKey.getPath() + " already exists. skip adding it");
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java 
b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
new file mode 100644
index 0000000..9d9c789
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -0,0 +1,161 @@
+package org.apache.helix;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * test zookeeper basis
+ */
+public class TestZkBasis extends ZkUnitTestBase {
+    class ZkListener implements  IZkDataListener, IZkChildListener {
+        String _parentPath = null;
+        String _dataDeletePath = null;
+        List<String> _currentChilds = Collections.emptyList();  // make sure 
it's set to null in #handleChildChange()
+
+        CountDownLatch _childChangeCountDown = new CountDownLatch(1);
+        CountDownLatch _dataDeleteCountDown = new CountDownLatch(1);
+
+        @Override
+        public void handleChildChange(String parentPath, List<String> 
currentChilds) throws Exception {
+            _parentPath = parentPath;
+            _currentChilds = currentChilds;
+            _childChangeCountDown.countDown();
+        }
+
+        @Override
+        public void handleDataChange(String dataPath, Object data) throws 
Exception {
+            //To change body of implemented methods use File | Settings | File 
Templates.
+        }
+
+        @Override
+        public void handleDataDeleted(String dataPath) throws Exception {
+            _dataDeletePath = dataPath;
+            _dataDeleteCountDown.countDown();
+        }
+    }
+    /**
+     * test zk watchers are renewed automatically after session expiry
+     *
+     * zookeeper-client side keeps all registered watchers see 
ZooKeeper.WatchRegistration.register()
+     * after session expiry, all watchers are renewed
+     * if a path that has watches on it has been removed during session expiry,
+     * the watchers on that path will still get callbacks after session 
renewal, especially:
+     *  a data-watch will get data-deleted callback
+     *  a child-watch will get a child-change callback with current-child-list 
= null
+     *
+     * this can be used for cleanup watchers on the zookeeper-client side
+     */
+    @Test
+    public void testWatchRenew() throws Exception {
+
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String testName = className + "_" + methodName;
+
+        final ZkClient client = new ZkClient(ZK_ADDR, 
ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        // make sure "/testName/test" doesn't exist
+        final String path = "/" + testName + "/test";
+        client.delete(path);
+
+        ZkListener listener = new ZkListener();
+        client.subscribeDataChanges(path, listener);
+        client.subscribeChildChanges(path, listener);
+
+        ZkTestHelper.expireSession(client);
+
+        boolean succeed = listener._childChangeCountDown.await(10, 
TimeUnit.SECONDS);
+        Assert.assertTrue(succeed, "fail to wait on child-change count-down in 
10 seconds after session-expiry");
+        Assert.assertEquals(listener._parentPath, path, "fail to get 
child-change callback after session-expiry");
+        Assert.assertNull(listener._currentChilds, "fail to get child-change 
callback with currentChilds=null after session expiry");
+
+        succeed = listener._dataDeleteCountDown.await(10, TimeUnit.SECONDS);
+        Assert.assertTrue(succeed, "fail to wait on data-delete count-down in 
10 seconds after session-expiry");
+        Assert.assertEquals(listener._dataDeletePath, path, "fail to get 
data-delete callback after session-expiry");
+
+        client.close();
+    }
+
+    /**
+     * after calling zkclient#unsubscribeXXXListener()
+     * an already registered watch will not be removed from 
ZooKeeper#watchManager#XXXWatches immediately.
+     * the watch will get removed on the following conditions:
+     *  1) there is a set/delete on the listening path via the zkclient
+     *  2) session expiry on the zkclient (i.e. the watch will not be renewed 
after session expiry)
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWatchRemove() throws Exception {
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String testName = className + "_" + methodName;
+
+        final ZkClient client = new ZkClient(ZK_ADDR, 
ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        // make sure "/testName/test" doesn't exist
+        final String path = "/" + testName + "/test";
+        client.createPersistent(path, true);
+
+        ZkListener listener = new ZkListener();
+        client.subscribeDataChanges(path, listener);
+        client.subscribeChildChanges(path, listener);
+
+        // listener should be in both ZkClient#_dataListener and 
ZkClient#_childListener set
+        Map<String, Set<IZkDataListener>> dataListenerMap = 
ZkTestHelper.getZkDataListener(client);
+        Assert.assertEquals(dataListenerMap.size(), 1, "ZkClient#_dataListener 
should have 1 listener");
+        Set<IZkDataListener> dataListenerSet = dataListenerMap.get(path);
+        Assert.assertNotNull(dataListenerSet, "ZkClient#_dataListener should 
have 1 listener on path: " + path);
+        Assert.assertEquals(dataListenerSet.size(), 1, "ZkClient#_dataListener 
should have 1 listener on path: " + path);
+
+
+        Map<String, Set<IZkChildListener>> childListenerMap = 
ZkTestHelper.getZkChildListener(client);
+        Assert.assertEquals(childListenerMap.size(), 1, 
"ZkClient#_childListener should have 1 listener");
+        Set<IZkChildListener> childListenerSet = childListenerMap.get(path);
+        Assert.assertNotNull(childListenerSet, "ZkClient#_childListener should 
have 1 listener on path: " + path);
+        Assert.assertEquals(childListenerSet.size(), 1, 
"ZkClient#_childListener should have 1 listener on path: " + path);
+
+        // watch should be in ZooKeeper#watchManager#XXXWatches
+        Map<String, List<String>> watchMap = ZkTestHelper.getZkWatch(client);
+        // System.out.println("watchMap1: " + watchMap);
+        List<String> dataWatch = watchMap.get("dataWatches");
+        Assert.assertNotNull(dataWatch, "ZooKeeper#watchManager#dataWatches 
should have 1 data watch on path: " + path);
+        Assert.assertEquals(dataWatch.size(), 1, 
"ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+        Assert.assertEquals(dataWatch.get(0), path, 
"ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+
+        List<String> childWatch = watchMap.get("childWatches");
+        Assert.assertNotNull(childWatch, "ZooKeeper#watchManager#childWatches 
should have 1 child watch on path: " + path);
+        Assert.assertEquals(childWatch.size(), 1, 
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + 
path);
+        Assert.assertEquals(childWatch.get(0), path, 
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + 
path);
+
+
+        client.unsubscribeDataChanges(path, listener);
+        client.unsubscribeChildChanges(path, listener);
+        // System.out.println("watchMap2: " + watchMap);
+        ZkTestHelper.expireSession(client);
+
+        // after session expiry, those watches should be removed
+        watchMap = ZkTestHelper.getZkWatch(client);
+        // System.out.println("watchMap3: " + watchMap);
+        dataWatch = watchMap.get("dataWatches");
+        Assert.assertTrue(dataWatch.isEmpty(), 
"ZooKeeper#watchManager#dataWatches should be empty");
+        childWatch = watchMap.get("childWatches");
+        Assert.assertTrue(childWatch.isEmpty(), 
"ZooKeeper#watchManager#childWatches should be empty");
+
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java 
b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index a5b5681..37b0664 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -23,14 +23,12 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.InstanceType;
@@ -349,6 +347,49 @@ public class ZkTestHelper
         }
     }
 
+    public static Map<String, List<String>> getZkWatch(ZkClient client) throws 
Exception {
+        Map<String, List<String>> lists = new HashMap<String, List<String>>();
+        ZkConnection connection = ((ZkConnection) client.getConnection());
+        ZooKeeper zk = connection.getZookeeper();
+
+        java.lang.reflect.Field field = getField(zk.getClass(), 
"watchManager");
+        field.setAccessible(true);
+        Object watchManager = field.get(zk);
+
+        java.lang.reflect.Field field2 = getField(watchManager.getClass(), 
"dataWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> dataWatches = (HashMap<String, 
Set<Watcher>>) field2.get(watchManager);
+
+        field2 = getField(watchManager.getClass(), "existWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> existWatches = (HashMap<String, 
Set<Watcher>>) field2.get(watchManager);
+
+        field2 = getField(watchManager.getClass(), "childWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> childWatches = (HashMap<String, 
Set<Watcher>>) field2.get(watchManager);
+
+        lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
+        lists.put("existWatches", new 
ArrayList<String>(existWatches.keySet()));
+        lists.put("childWatches", new 
ArrayList<String>(childWatches.keySet()));
+
+        return lists;
+    }
+
+    public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient 
client) throws Exception {
+        java.lang.reflect.Field field = getField(client.getClass(), 
"_dataListener");
+        field.setAccessible(true);
+        Map<String, Set<IZkDataListener>> dataListener = (Map<String, 
Set<IZkDataListener>>)field.get(client);
+        return dataListener;
+    }
+
+    public static Map<String, Set<IZkChildListener>> 
getZkChildListener(ZkClient client) throws Exception {
+        java.lang.reflect.Field field = getField(client.getClass(), 
"_childListener");
+        field.setAccessible(true);
+        Map<String, Set<IZkChildListener>> childListener = (Map<String, 
Set<IZkChildListener>>)field.get(client);
+        return childListener;
+    }
+
+
     public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws 
Exception {
         java.lang.reflect.Field field = getField(zkclient.getClass(), 
"_eventThread");
         field.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 6359434..6b0488f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -24,13 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.ZkUnitTestBase;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.*;
 import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -303,7 +304,134 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
                    + new Date(System.currentTimeMillis()));
        }
 
-       // debug
+
+    @Test
+    public void testRemoveUserCbHandlerOnPathRemoval() throws  Exception {
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String clusterName = className + "_" + methodName;
+        final int n = 3;
+        final String zkAddr = ZK_ADDR;
+        System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+        TestHelper.setupCluster(clusterName, zkAddr, 12918,
+                "localhost",
+                "TestDB",
+                1,  // resource
+                32, // partitions
+                n,  // nodes
+                2,  // replicas
+                "MasterSlave",
+                true);
+
+        ClusterController controller = new ClusterController(clusterName, 
"controller_0", zkAddr);
+        controller.syncStart();
+
+        MockParticipant[] participants = new MockParticipant[n];
+        for (int i = 0; i < n; i++) {
+            String instanceName = "localhost_" + (12918 + i);
+            participants[i] = new MockParticipant(clusterName, instanceName, 
zkAddr, null);
+            participants[i].syncStart();
+
+            // register a controller listener on participant_0
+            if (i == 0) {
+                ZkHelixTestManager manager = participants[0].getManager();
+                manager.addCurrentStateChangeListener(new 
CurrentStateChangeListener() {
+                    @Override
+                    public void onStateChange(String instanceName, 
List<CurrentState> statesInfo, NotificationContext changeContext) {
+                        //To change body of implemented methods use File | 
Settings | File Templates.
+                        System.out.println(instanceName + " on current-state 
change, type: " + changeContext.getType());
+                    }
+                }, manager.getInstanceName(), manager.getSessionId());
+            }
+        }
+
+        Boolean result = ClusterStateVerifier.verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        ZkHelixTestManager participantToExpire = participants[0].getManager();
+        String oldSessionId = participantToExpire.getSessionId();
+        PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+
+        // check manager#hanlders
+        Assert.assertEquals(participantToExpire.getHandlers().size(), 3, 
"Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+
+        // check zkclient#listeners
+        Map<String, Set<IZkDataListener>> dataListeners = 
ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+        Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+        // printZkListeners(participantToExpire.getZkClient());
+        Assert.assertEquals(dataListeners.size(), 1, "Should have 1 path 
(CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
+        String path = 
keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, 
"TestDB0").getPath();
+        Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 
data-listeners on path: " + path);
+        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths 
(CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 
child-listener");
+        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), 
oldSessionId).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 
child-listener on path: " + path);
+        path = 
keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 
child-listener on path: " + path);
+        path = keyBuilder.controller().getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 
child-listener on path: " + path);
+
+        // check zookeeper#watches on client side
+        Map<String, List<String>> watchPaths = 
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 4, "Should 
have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, 
CONTROLLER, MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 3, "Should 
have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+
+
+        // expire localhost_12918
+        System.out.println("Expire participant: " + 
participantToExpire.getInstanceName() + ", session: " + 
participantToExpire.getSessionId());
+        ZkTestHelper.expireSession(participantToExpire.getZkClient());
+        String newSessionId = participantToExpire.getSessionId();
+        System.out.println(participantToExpire.getInstanceName() + " 
oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+        result = ClusterStateVerifier.verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        // check manager#hanlders
+        Assert.assertEquals(participantToExpire.getHandlers().size(), 2, 
"Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} 
handler should be removed by CallbackHandler#handleChildChange()");
+
+        // check zkclient#listeners
+        dataListeners = 
ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+        childListeners = 
ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+        // printZkListeners(participantToExpire.getZkClient());
+        Assert.assertTrue(dataListeners.isEmpty(), "Should have no 
data-listeners");
+        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths 
(CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+                + "CONTROLLER and MESSAGE has 1 child-listener each. 
CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove 
empty childListener set. probably a ZkClient bug. see 
ZkClient#unsubscribeChildChange())");
+        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), 
oldSessionId).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 0, "Should have 
no child-listener on path: " + path);
+        path = 
keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 
child-listener on path: " + path);
+        path = keyBuilder.controller().getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 
child-listener on path: " + path);
+
+        // check zookeeper#watches on client side
+        watchPaths = 
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should 
have 2 data-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should 
have 2 child-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("existWatches").size(), 2, "Should 
have 2 exist-watches: CURRENTSTATE/{oldSessionId} and 
CURRENTSTATE/{oldSessionId}/TestDB0");
+
+        // another session expiry on localhost_12918 should clear the two 
exist-watches on CURRENTSTATE/{oldSessionId}
+        System.out.println("Expire participant: " + 
participantToExpire.getInstanceName() + ", session: " + 
participantToExpire.getSessionId());
+        ZkTestHelper.expireSession(participantToExpire.getZkClient());
+        result = ClusterStateVerifier.verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        // check zookeeper#watches on client side
+        watchPaths = 
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should 
have 2 data-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should 
have 2 child-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("existWatches").size(), 0, "Should 
have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and 
CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
+
+        // Thread.sleep(1000);
+        System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+    }
+
+       // debug code
        static String printHandlers(ZkHelixTestManager manager) 
        {
                StringBuilder sb = new StringBuilder();
@@ -322,4 +450,31 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
            
            return sb.toString();
     }
+
+    void printZkListeners(ZkClient client) throws  Exception{
+        Map<String, Set<IZkDataListener>> datalisteners = 
ZkTestHelper.getZkDataListener(client);
+        Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(client);
+
+        System.out.println("dataListeners {");
+        for (String path : datalisteners.keySet()) {
+            System.out.println("\t" + path + ": ");
+            Set<IZkDataListener> set = datalisteners.get(path);
+            for (IZkDataListener listener : set) {
+                CallbackHandler handler = (CallbackHandler)listener;
+                System.out.println("\t\t" + handler.getListener());
+            }
+        }
+        System.out.println("}");
+
+        System.out.println("childListeners {");
+        for (String path : childListeners.keySet()) {
+            System.out.println("\t" + path + ": ");
+            Set<IZkChildListener> set = childListeners.get(path);
+            for (IZkChildListener listener : set) {
+                CallbackHandler handler = (CallbackHandler)listener;
+                System.out.println("\t\t" + handler.getListener());
+            }
+        }
+        System.out.println("}");
+    }
 }

Reply via email to