This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1758f2d  Fix unstable test TestZkCallbackHandlerLeak. (#1734)
1758f2d is described below

commit 1758f2d3b40270205efd54eacc24f0ed8c13405a
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed May 12 10:26:45 2021 -0700

    Fix unstable test TestZkCallbackHandlerLeak. (#1734)
    
    Extend the wait and check section to wait for the expected condition with 
more tolerance.
---
 .../integration/TestZkCallbackHandlerLeak.java     | 195 ++++++---------------
 1 file changed, 54 insertions(+), 141 deletions(-)

diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 7bc92dc..d9f8707 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -25,9 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.AccessOption;
-import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
@@ -43,7 +41,6 @@ import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.tools.ClusterStateVerifier;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -58,7 +55,6 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase 
{
 
   @Test
   public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -106,39 +102,26 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     final MockParticipantManager participantManagerToExpire = participants[1];
 
     // check controller zk-watchers
-    boolean result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        // Set<String> watchPaths = watchers.get("0x" + 
controllerManager.getSessionId());
-        Set<String> watchPaths = watchers.get("0x" + 
controller.getSessionId());
-        // System.out.println("controller watch paths: " + watchPaths);
+    boolean result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
 
-        // where n is number of nodes and r is number of resources
-        return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
-      }
+      // where n is number of nodes and r is number of resources
+      return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of 
zk-watchers.");
 
     // check participant zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + 
participantManagerToExpire.getSessionId());
-        // System.out.println("participant watch paths: " + watchPaths);
+    result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + 
participantManagerToExpire.getSessionId());
 
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
+      // participant should have 1 zk-watcher: 1 for MESSAGE
+      return watchPaths.size() == 1;
     }, 2000);
     Assert.assertTrue(result, "Participant should have 1 zk-watcher. 
MESSAGES->HelixTaskExecutor");
 
     // check HelixManager#_handlers
-    // printHandlers(controllerManager);
-    // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
     Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
@@ -162,39 +145,27 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     Assert.assertTrue(result);
 
     // check controller zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + 
controller.getSessionId());
-        // System.out.println("controller watch paths after session expiry: " 
+ watchPaths);
+    result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
 
-        // where n is number of nodes and r is number of resources
-        // one participant is disconnected, and its task current states are 
removed
-        return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n 
- 1) + 6 + r);
-      }
+      // where n is number of nodes and r is number of resources
+      // one participant is disconnected, and its task current states are 
removed
+      return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n - 
1) + 6 + r);
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of zk-watchers 
after session expiry.");
 
     // check participant zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + 
participantManagerToExpire.getSessionId());
-        // System.out.println("participant watch paths after session expiry: " 
+ watchPaths);
+    result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + 
participantManagerToExpire.getSessionId());
 
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
+      // participant should have 1 zk-watcher: 1 for MESSAGE
+      return watchPaths.size() == 1;
     }, 2000);
     Assert.assertTrue(result, "Participant should have 1 zk-watcher after 
session expiry.");
 
     // check handlers
-    // printHandlers(controllerManager);
-    // printHandlers(participantManagerToExpire);
     int handlerNb = controller.getHandlers().size();
     Assert.assertEquals(handlerNb, controllerHandlerNb,
         "controller callback handlers should not increase after participant 
session expiry");
@@ -214,7 +185,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
   @Test
   public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -261,17 +231,13 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     final MockParticipantManager participantManager = participants[0];
 
     // wait until we get all the listeners registered
-    TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        int controllerHandlerNb = controller.getHandlers().size();
-        int particHandlerNb = participantManager.getHandlers().size();
-        if (controllerHandlerNb == 10 && particHandlerNb == 2)
-          return true;
-        else
-          return false;
-      }
+    TestHelper.verify(() -> {
+      int controllerHandlerNb = controller.getHandlers().size();
+      int particHandlerNb = participantManager.getHandlers().size();
+      if (controllerHandlerNb == 10 && particHandlerNb == 2)
+        return true;
+      else
+        return false;
     }, 1000);
 
     int controllerHandlerNb = controller.getHandlers().size();
@@ -296,39 +262,29 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     Assert.assertTrue(verifier.verifyByPolling());
 
     // check controller zk-watchers
-    boolean result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + 
controller.getSessionId());
-        System.err.println("controller watch paths after session expiry: " + 
watchPaths.size());
-
-        // where r is number of resources and n is number of nodes
-        // task resource count does not attribute to ideal state watch paths
-        int expected = (8 + r + (6 + r + taskResourceCount) * n);
-        return watchPaths.size() == expected;
-      }
+    boolean result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
+      System.err.println("controller watch paths after session expiry: " + 
watchPaths.size());
+
+      // where r is number of resources and n is number of nodes
+      // task resource count does not attribute to ideal state watch paths
+      int expected = (8 + r + (6 + r + taskResourceCount) * n);
+      return watchPaths.size() == expected;
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect zk-watchers after 
session expiry.");
 
     // check participant zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
+    result = TestHelper.verify(() -> {
+      Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
+      Set<String> watchPaths = watchers.get("0x" + 
participantManager.getSessionId());
 
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = 
ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + 
participantManager.getSessionId());
-        // System.out.println("participant watch paths after session expiry: " 
+ watchPaths);
-
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
+      // participant should have 1 zk-watcher: 1 for MESSAGE
+      return watchPaths.size() == 1;
     }, 2000);
     Assert.assertTrue(result, "Participant should have 1 zk-watcher after 
session expiry.");
 
     // check HelixManager#_handlers
-    // printHandlers(controllerManager);
     int handlerNb = controller.getHandlers().size();
     Assert.assertEquals(handlerNb, controllerHandlerNb,
         "controller callback handlers should not increase after participant 
session expiry, but was "
@@ -400,7 +356,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     MockParticipantManager participantToExpire = participants[0];
     String oldSessionId = participantToExpire.getSessionId();
-    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
     // expire participant session; leaked callback handler used to be not 
reset() and be removed from ZkClient
     LOG.info("Expire participant: " + participantToExpire.getInstanceName() + 
", session: "
@@ -471,14 +426,13 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     rpManager.syncStart();
     RoutingTableProvider rp = new RoutingTableProvider(rpManager, 
PropertyType.CURRENTSTATES);
 
-    LOG.info("add job");
     MockParticipantManager jobParticipant = participants[0];
     String jobSessionId = jobParticipant.getSessionId();
-    HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+    HelixDataAccessor jobAccessor = jobParticipant.getHelixDataAccessor();
     PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
     PropertyKey db0key =
         jobKeyBuilder.currentState(jobParticipant.getInstanceName(), 
jobSessionId, "TestDB0");
-    CurrentState db0 = jobAccesor.getProperty(db0key);
+    CurrentState db0 = jobAccessor.getProperty(db0key);
     PropertyKey jobKey =
         jobKeyBuilder.currentState(jobParticipant.getInstanceName(), 
jobSessionId, "BackupQueue");
     CurrentState cs = new CurrentState("BackupQueue");
@@ -490,16 +444,16 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     LOG.info("add job");
     for (int i = 0; i < mJobUpdateCnt; i++) {
-      jobAccesor.setProperty(jobKey, cs);
+      jobAccessor.setProperty(jobKey, cs);
     }
 
     // verify new watcher is installed on the new node
-    boolean result = TestHelper.verify(() -> {
-      return 
ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath());
-    }, TestHelper.WAIT_DURATION);
-    Assert.assertTrue(result, "Should get initial clusterConfig callback 
invoked");
-    rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
-    
Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+    boolean result = TestHelper.verify(
+        () -> 
ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath())
+            && 
ZkTestHelper.getZkWatch(rpManager.getZkClient()).get("dataWatches")
+            .contains(jobKey.getPath()), TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result,
+        "Should get initial clusterConfig callback invoked and add data 
watchers");
 
     LOG.info("remove job");
     jobParticipant.getZkClient().delete(jobKey.getPath());
@@ -519,7 +473,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
-    // rp.shutdown();
+    rp.shutdown();
     rpManager.syncStop();
     TestHelper.dropCluster(clusterName, _gZkClient);
 
@@ -553,16 +507,9 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
       // register a controller listener on participant_0
       if (i == 0) {
-        // ZkHelixTestManager manager = participants[0].getManager();
         MockParticipantManager manager = participants[0];
-        manager.addCurrentStateChangeListener(new CurrentStateChangeListener() 
{
-          @Override
-          public void onStateChange(String instanceName, List<CurrentState> 
statesInfo,
-              NotificationContext changeContext) {
-            // To change body of implemented methods use File | Settings | 
File Templates.
-            // System.out.println(instanceName + " on current-state change, 
type: " +
-            // changeContext.getType());
-          }
+        manager.addCurrentStateChangeListener((instanceName1, statesInfo, 
changeContext) -> {
+          // To change body of implemented methods use File | Settings | File 
Templates.
         }, manager.getInstanceName(), manager.getSessionId());
       }
     }
@@ -586,7 +533,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
         ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
     Map<String, Set<IZkChildListener>> childListeners =
         ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-    // printZkListeners(participantToExpire.getZkClient());
     Assert.assertEquals(dataListeners.size(), 1,
         "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 
data-listeners");
     String path =
@@ -609,7 +555,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     // check zookeeper#watches on client side
     Map<String, List<String>> watchPaths =
         ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
     Assert
         .assertEquals(watchPaths.get("dataWatches").size(), 3,
             "Should have 3 data-watches: CURRENTSTATE/{sessionId}, 
CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
@@ -640,7 +585,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     // check zkclient#listeners
     dataListeners = 
ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
     childListeners = 
ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-    // printZkListeners(participantToExpire.getZkClient());
     Assert.assertTrue(dataListeners.isEmpty(), "Should have no 
data-listeners");
     Assert
         .assertEquals(
@@ -660,7 +604,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
     Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
         "Should have 1 data-watches: MESSAGES");
     Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
@@ -694,7 +637,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    // System.out.println("localhost_12918 zk-client side watchPaths: " + 
watchPaths + "\n");
     Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
         "Should have 1 data-watches: MESSAGES");
     Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
@@ -705,8 +647,6 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
             0,
             "Should have no exist-watches. exist-watches on 
CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be 
cleared during handleNewSession");
 
-    // Thread.sleep(1000);
-
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -736,31 +676,4 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     return sb.toString();
   }
-
-  void printZkListeners(HelixZkClient client) throws Exception {
-    Map<String, Set<IZkDataListener>> datalisteners = 
ZkTestHelper.getZkDataListener(client);
-    Map<String, Set<IZkChildListener>> childListeners = 
ZkTestHelper.getZkChildListener(client);
-
-    System.out.println("dataListeners {");
-    for (String path : datalisteners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkDataListener> set = datalisteners.get(path);
-      for (IZkDataListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-
-    System.out.println("childListeners {");
-    for (String path : childListeners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkChildListener> set = childListeners.get(path);
-      for (IZkChildListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-  }
 }

Reply via email to