This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 3238840 Reduce lock contention in CallbackHandler.enqueueTask (#1589)
3238840 is described below
commit 32388408faeee80b5eacaaf6da1306abb86ed0c7
Author: xyuanlu <[email protected]>
AuthorDate: Fri Dec 18 16:40:29 2020 -0800
Reduce lock contention in CallbackHandler.enqueueTask (#1589)
Reduce lock contention in CallbackHandler.enqueueTask using atomic ref.
---
.../apache/helix/manager/zk/CallbackHandler.java | 42 +++++++++++-----------
1 file changed, 22 insertions(+), 20 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 8342ef5..03c5f73 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixConstants.ChangeType;
@@ -133,7 +134,7 @@ public class CallbackHandler implements IZkChildListener,
IZkDataListener {
private HelixCallbackMonitor _monitor;
// TODO: make this be per _manager or per _listener instaed of per
callbackHandler -- Lei
- private CallbackProcessor _batchCallbackProcessor;
+ private AtomicReference<CallbackProcessor> _batchCallbackProcessorRef = new
AtomicReference<>();
private boolean _watchChild = true; // Whether we should subscribe to the
child znode's data
// change.
@@ -319,13 +320,12 @@ public class CallbackHandler implements IZkChildListener,
IZkDataListener {
logger.info("CallbackHandler {} is not ready, ignore change callback
from path: {}, for "
+ "listener: {}", _uid, _path, _listener);
} else {
- synchronized (this) {
- if (_batchCallbackProcessor != null) {
- _batchCallbackProcessor.queueEvent(changeContext.getType(),
changeContext);
- } else {
- throw new HelixException(
- "Failed to process callback in batch mode. Batch Callback
Processor does not exist.");
- }
+ CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+ if (callbackProcessor != null) {
+ callbackProcessor.queueEvent(changeContext.getType(), changeContext);
+ } else {
+ throw new HelixException(
+ "Failed to process callback in batch mode. Batch Callback
Processor does not exist.");
}
}
} else {
@@ -643,12 +643,14 @@ public class CallbackHandler implements IZkChildListener,
IZkDataListener {
logger.info("initializing CallbackHandler: {}, content: {} ", _uid,
getContent());
if (_batchModeEnabled) {
- synchronized (this) {
- if (_batchCallbackProcessor != null) {
- _batchCallbackProcessor.resetEventQueue();
- } else {
- _batchCallbackProcessor = new CallbackProcessor(this);
- _batchCallbackProcessor.start();
+ CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+ if (callbackProcessor != null) {
+ callbackProcessor.resetEventQueue();
+ } else {
+ callbackProcessor = new CallbackProcessor(this);
+ callbackProcessor.start();
+ if (!_batchCallbackProcessorRef.compareAndSet(null,
callbackProcessor)) {
+ callbackProcessor.shutdown();
}
}
}
@@ -766,16 +768,16 @@ public class CallbackHandler implements IZkChildListener,
IZkDataListener {
isShutdown);
try {
_ready = false;
- synchronized (this) {
- if (_batchCallbackProcessor != null) {
+ CallbackProcessor callbackProcessor = _batchCallbackProcessorRef.get();
+ if (callbackProcessor != null) {
if (isShutdown) {
- _batchCallbackProcessor.shutdown();
- _batchCallbackProcessor = null;
+ if (_batchCallbackProcessorRef.compareAndSet(callbackProcessor,
null)) {
+ callbackProcessor.shutdown();
+ }
} else {
- _batchCallbackProcessor.resetEventQueue();
+ callbackProcessor.resetEventQueue();
}
}
- }
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.FINALIZE);
changeContext.setChangeType(_changeType);