This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fa63a  Remove duplicate subscribe in CallBackHandler (#1504)
11fa63a is described below

commit 11fa63ae3039cbf2dc56ca1c9596e8379751db0c
Author: xyuanlu <[email protected]>
AuthorDate: Mon Nov 9 11:50:29 2020 -0800

    Remove duplicate subscribe in CallBackHandler (#1504)
    
    Remove duplicate subscribe in CallBackHandler.handleChildChange()
    
    Duplicate subscribes lead to longer time spend when process callbacks in 
zkClient, which eventually leads to increased PendingCallback queue size. This 
PR removes duplicate subscribeForChanges in CallBackHandler to improve 
performance.
---
 .../java/org/apache/helix/NotificationContext.java |  9 +++
 .../apache/helix/manager/zk/CallbackHandler.java   | 64 ++--------------------
 .../integration/TestZkCallbackHandlerLeak.java     | 28 ++++++----
 3 files changed, 30 insertions(+), 71 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java 
b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 4195d94..b35968b 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -43,6 +43,7 @@ public class NotificationContext {
   private String _pathChanged;
   private String _eventName;
   private long _creationTime;
+  private boolean _isChildChange;
 
   /**
    * Get the name associated with the event
@@ -227,4 +228,12 @@ public class NotificationContext {
   public void setChangeType(HelixConstants.ChangeType changeType) {
     this._changeType = changeType;
   }
+
+  public boolean getIsChildChange() {
+    return _isChildChange;
+  }
+
+  public void setIsChildChange(boolean isChildChange) {
+    this._isChildChange = isChildChange;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index dece3de..a57a678 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -118,8 +118,6 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
   }
 
-  // processor to handle async zk event resubscription.
-  private static DedupEventProcessor SubscribeChangeEventProcessor;
 
   private final String _path;
   private final Object _listener;
@@ -142,50 +140,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
   // indicated whether this CallbackHandler is ready to serve event callback 
from ZkClient.
   private boolean _ready = false;
 
-  static {
-    SubscribeChangeEventProcessor = new DedupEventProcessor<CallbackHandler, 
SubscribeChangeEvent>(
-        "Singleton", "CallbackHandler-AsycSubscribe") {
-      @Override
-      protected void handleEvent(SubscribeChangeEvent event) {
-        logger.info("CallbackHandler {}, resubscribe change listener to path: 
{}, for listener: {}, watchChild: {}",
-            event.handler._uid, event.path, event.listener, event.watchChild);
-        try {
-          if (event.handler.isReady()) {
-            event.handler.subscribeForChanges(event.callbackType, event.path, 
event.watchChild);
-          } else {
-            logger.info("CallbackHandler is not ready, stop subscribing 
changes listener to "
-                    + "path: {} for listener: {} watchChild: {}", event.path, 
event.listener,
-                event.listener);
-          }
-        } catch (Exception e) {
-          logger.error("Failed to resubscribe change to path: {} for listener: 
{}", event.path,
-              event.listener, e);
-        }
-      }
-    };
-
-    SubscribeChangeEventProcessor.start();
-  }
-
-  class SubscribeChangeEvent {
-    final CallbackHandler handler;
-    final String path;
-    final NotificationContext.Type callbackType;
-    final Object listener;
-    final boolean watchChild;
-
-    SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type 
callbackType,
-        String path, boolean watchChild, Object listener) {
-      this.handler = handler;
-      this.path = path;
-      this.callbackType = callbackType;
-      this.listener = listener;
-      this.watchChild = watchChild;
-    }
-  }
-
-  class CallbackProcessor
-      extends DedupEventProcessor<NotificationContext.Type, 
NotificationContext> {
+  class CallbackProcessor extends 
DedupEventProcessor<NotificationContext.Type, NotificationContext> {
     private CallbackHandler _handler;
 
     public CallbackProcessor(CallbackHandler handler) {
@@ -402,13 +357,9 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
       }
       _expectTypes = nextNotificationType.get(type);
 
-      if (type == Type.INIT || type == Type.FINALIZE) {
+      if (type == Type.INIT || type == Type.FINALIZE || 
changeContext.getIsChildChange()) {
         subscribeForChanges(changeContext.getType(), _path, _watchChild);
-      } else {
-        // put SubscribeForChange run in async thread to reduce the latency of 
zk callback handling.
-        subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
-
       if (_changeType == IDEAL_STATE) {
         IdealStateChangeListener idealStateChangeListener = 
(IdealStateChangeListener) _listener;
         List<IdealState> idealStates = preFetch(_propertyKey);
@@ -598,14 +549,6 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     }
   }
 
-  /** Subscribe Changes in asynchronously */
-  private void subscribeForChangesAsyn(NotificationContext.Type callbackType, 
String path,
-      boolean watchChild) {
-    SubscribeChangeEvent subscribeEvent =
-        new SubscribeChangeEvent(this, callbackType, path, watchChild, 
_listener);
-    SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, 
subscribeEvent);
-  }
-
   private void subscribeForChanges(NotificationContext.Type callbackType, 
String path,
       boolean watchChild) {
     logger.info("CallbackHandler {} subscribing changes listener to path: {}, 
callback type: {}, "
@@ -734,6 +677,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
         changeContext.setType(NotificationContext.Type.CALLBACK);
         changeContext.setPathChanged(dataPath);
         changeContext.setChangeType(_changeType);
+        changeContext.setIsChildChange(false);
         enqueueTask(changeContext);
       }
     } catch (Exception e) {
@@ -796,7 +740,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
           changeContext.setType(NotificationContext.Type.CALLBACK);
           changeContext.setPathChanged(parentPath);
           changeContext.setChangeType(_changeType);
-          subscribeForChanges(changeContext.getType(), _path, _watchChild);
+          changeContext.setIsChildChange(true);
           enqueueTask(changeContext);
         }
       }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 60dbf91..2f27d4b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -26,9 +26,6 @@ import java.util.Set;
 
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
@@ -40,12 +37,12 @@ import 
org.apache.helix.integration.manager.ClusterSpectatorManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.tools.ClusterStateVerifier;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.slf4j.Logger;
@@ -467,23 +464,32 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     cs.setSessionId(jobSessionId);
     cs.setStateModelDefRef(db0.getStateModelDefRef());
 
+    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    
Assert.assertFalse(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+
     LOG.info("add job");
-    boolean rtJob = false;
     for (int i = 0; i < mJobUpdateCnt; i++) {
-      rtJob = jobAccesor.setProperty(jobKey, cs);
+      jobAccesor.setProperty(jobKey, cs);
     }
 
+    // verify new watcher is installed on the new node
+    boolean result = TestHelper.verify(() -> {
+      return 
ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath());
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Should get initial clusterConfig callback 
invoked");
+    rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    
Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+
     LOG.info("remove job");
-    rtJob = jobParticipant.getZkClient().delete(jobKey.getPath());
+    jobParticipant.getZkClient().delete(jobKey.getPath());
 
     // validate the job watch is not leaked.
     Thread.sleep(5000);
 
     Map<String, Set<String>> listenersByZkPath = 
ZkTestHelper.getListenersByZkPath(ZK_ADDR);
-    boolean jobKeyExists = 
listenersByZkPath.keySet().contains(jobKey.getPath());
-    Assert.assertFalse(jobKeyExists);
+    Assert.assertFalse(listenersByZkPath.keySet().contains(jobKey.getPath()));
 
-    Map<String, List<String>> rpWatchPaths = 
ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
     List<String> existWatches = rpWatchPaths.get("existWatches");
     Assert.assertTrue(existWatches.isEmpty());
 

Reply via email to