This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3238840  Reduce lock contention in CallbackHandler.enqueueTask (#1589)
3238840 is described below

commit 32388408faeee80b5eacaaf6da1306abb86ed0c7
Author: xyuanlu <[email protected]>
AuthorDate: Fri Dec 18 16:40:29 2020 -0800

    Reduce lock contention in CallbackHandler.enqueueTask (#1589)
    
    Reduce lock contention in CallbackHandler.enqueueTask using atomic ref.
---
 .../apache/helix/manager/zk/CallbackHandler.java   | 42 +++++++++++-----------
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 8342ef5..03c5f73 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixConstants.ChangeType;
@@ -133,7 +134,7 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
   private HelixCallbackMonitor _monitor;
 
   // TODO: make this be per _manager or per _listener instaed of per 
callbackHandler -- Lei
-  private CallbackProcessor _batchCallbackProcessor;
+  private AtomicReference<CallbackProcessor> _batchCallbackProcessorRef = new 
AtomicReference<>();
   private boolean _watchChild = true; // Whether we should subscribe to the 
child znode's data
   // change.
 
@@ -319,13 +320,12 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
         logger.info("CallbackHandler {} is not ready, ignore change callback 
from path: {}, for "
             + "listener: {}", _uid, _path, _listener);
       } else {
-        synchronized (this) {
-          if (_batchCallbackProcessor != null) {
-            _batchCallbackProcessor.queueEvent(changeContext.getType(), 
changeContext);
-          } else {
-            throw new HelixException(
-                "Failed to process callback in batch mode. Batch Callback 
Processor does not exist.");
-          }
+        CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+        if (callbackProcessor != null) {
+          callbackProcessor.queueEvent(changeContext.getType(), changeContext);
+        } else {
+          throw new HelixException(
+              "Failed to process callback in batch mode. Batch Callback 
Processor does not exist.");
         }
       }
     } else {
@@ -643,12 +643,14 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     logger.info("initializing CallbackHandler: {}, content: {} ", _uid, 
getContent());
 
     if (_batchModeEnabled) {
-      synchronized (this) {
-        if (_batchCallbackProcessor != null) {
-          _batchCallbackProcessor.resetEventQueue();
-        } else {
-          _batchCallbackProcessor = new CallbackProcessor(this);
-          _batchCallbackProcessor.start();
+      CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+      if (callbackProcessor != null) {
+        callbackProcessor.resetEventQueue();
+      } else {
+        callbackProcessor = new CallbackProcessor(this);
+        callbackProcessor.start();
+        if (!_batchCallbackProcessorRef.compareAndSet(null, 
callbackProcessor)) {
+          callbackProcessor.shutdown();
         }
       }
     }
@@ -766,16 +768,16 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
         isShutdown);
     try {
       _ready = false;
-      synchronized (this) {
-        if (_batchCallbackProcessor != null) {
+      CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+        if (callbackProcessor != null) {
           if (isShutdown) {
-            _batchCallbackProcessor.shutdown();
-            _batchCallbackProcessor = null;
+            if (_batchCallbackProcessorRef.compareAndSet(callbackProcessor, 
null)) {
+              callbackProcessor.shutdown();
+            }
           } else {
-            _batchCallbackProcessor.resetEventQueue();
+            callbackProcessor.resetEventQueue();
           }
         }
-      }
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
       changeContext.setChangeType(_changeType);

Reply via email to