[
https://issues.apache.org/jira/browse/ZOOKEEPER-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lujie updated ZOOKEEPER-4711:
-----------------------------
Description:
When we run :
mvn test -Dmaven.test.failure.ignore=true
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
-DfailIfNoTests=false -DredirectTestOutputToFile=false
The following method in class : org.apache.zookeeper.server.watch.WatcherCleaner
{code:java}
public void addDeadWatcher(int watcherBit) {
// Wait if there are too many watchers waiting to be closed,
// this is will slow down the socket packet processing and
// the adding watches in the ZK pipeline.
while (maxInProcessingDeadWatchers > 0 && !stopped &&
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
try {
RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
long startTime = Time.currentElapsedTime();
synchronized (processingCompletedEvent) {
processingCompletedEvent.wait(100);
}
long latency = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
} catch (InterruptedException e) {
LOG.info("Got interrupted while waiting for dead watches queue
size");
break;
}
}
synchronized (this) {
if (deadWatchers.add(watcherBit)) {
totalDeadWatchers.incrementAndGet();
ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
if (deadWatchers.size() >= watcherCleanThreshold) {
synchronized (cleanEvent) {
cleanEvent.notifyAll();
}
}
}
}
}{code}
{code:java}
@Override
public void run() {
while (!stopped) {
synchronized (cleanEvent) {
try {
// add some jitter to avoid cleaning dead watchers at the
// same time in the quorum
if (!stopped && deadWatchers.size() <
watcherCleanThreshold) {
int maxWaitMs = (watcherCleanIntervalInSeconds
+
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) *
1000;
cleanEvent.wait(maxWaitMs);
}
} catch (InterruptedException e) {
LOG.info("Received InterruptedException while waiting for
cleanEvent");
break;
}
} if (deadWatchers.isEmpty()) {
continue;
} synchronized (this) {
// Clean the dead watchers need to go through all the current
// watches, which is pretty heavy and may take a second if
// there are millions of watches, that's why we're doing lazily
// batch clean up in a separate thread with a snapshot of the
// current dead watchers.
final Set<Integer> snapshot = new HashSet<>(deadWatchers);
deadWatchers.clear();
int total = snapshot.size();
LOG.info("Processing {} dead watchers", total);
cleaners.schedule(new WorkRequest() {
@Override
public void doWork() throws Exception {
long startTime = Time.currentElapsedTime();
listener.processDeadWatchers(snapshot);
long latency = Time.currentElapsedTime() - startTime;
LOG.info("Takes {} to process {} watches", latency,
total);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
totalDeadWatchers.addAndGet(-total);
synchronized (processingCompletedEvent) {
processingCompletedEvent.notifyAll();
}
}
});
}
}
LOG.info("WatcherCleaner thread exited");
}{code}
As we can see, the two methods visist deadWatchers Object by different thread.
*Thread in run()* is *read* operation on deadWachers and Thread in
addDeadWatcher is *write* operation on deadWachers. This causes a data race
without any lock.
was:
When we run :
mvn test -Dmaven.test.failure.ignore=true
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
-DfailIfNoTests=false -DredirectTestOutputToFile=false
The method of addDeadWatcher
(
System.out.println("2s::" +Thread.currentThread().getName()+ "
"+System.identityHashCode(deadWatchers)+" " + System.currentTimeMillis());
this is my debug info.
)
{code:java}
public void addDeadWatcher(int watcherBit) {
// Wait if there are too many watchers waiting to be closed,
// this is will slow down the socket packet processing and
// the adding watches in the ZK pipeline.
while (maxInProcessingDeadWatchers > 0 && !stopped &&
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
try {
RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
long startTime = Time.currentElapsedTime();
synchronized (processingCompletedEvent) {
processingCompletedEvent.wait(100);
}
long latency = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
} catch (InterruptedException e) {
LOG.info("Got interrupted while waiting for dead watches queue
size");
break;
}
}
synchronized (this) {
if (deadWatchers.add(watcherBit)) {
totalDeadWatchers.incrementAndGet();
ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
if (deadWatchers.size() >= watcherCleanThreshold) {
synchronized (cleanEvent) {
cleanEvent.notifyAll();
}
}
}
}
}{code}
{code:java}
@Override
public void run() {
while (!stopped) {
synchronized (cleanEvent) {
try {
// add some jitter to avoid cleaning dead watchers at the
// same time in the quorum
if (!stopped && deadWatchers.size() <
watcherCleanThreshold) {
int maxWaitMs = (watcherCleanIntervalInSeconds
+
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) *
1000;
cleanEvent.wait(maxWaitMs);
}
} catch (InterruptedException e) {
LOG.info("Received InterruptedException while waiting for
cleanEvent");
break;
}
} if (deadWatchers.isEmpty()) {
continue;
} synchronized (this) {
// Clean the dead watchers need to go through all the current
// watches, which is pretty heavy and may take a second if
// there are millions of watches, that's why we're doing lazily
// batch clean up in a separate thread with a snapshot of the
// current dead watchers.
final Set<Integer> snapshot = new HashSet<>(deadWatchers);
deadWatchers.clear();
int total = snapshot.size();
LOG.info("Processing {} dead watchers", total);
cleaners.schedule(new WorkRequest() {
@Override
public void doWork() throws Exception {
long startTime = Time.currentElapsedTime();
listener.processDeadWatchers(snapshot);
long latency = Time.currentElapsedTime() - startTime;
LOG.info("Takes {} to process {} watches", latency,
total);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
totalDeadWatchers.addAndGet(-total);
synchronized (processingCompletedEvent) {
processingCompletedEvent.notifyAll();
}
}
});
}
}
LOG.info("WatcherCleaner thread exited");
}{code}
As we can see, the two methods visist deadWatchers Object by different thread.
*Thread in run()* is *read* operation on deadWachers and Thread in
addDeadWatcher is *write* operation on deadWachers. This causes a data race
without any lock.
> a data race in
> org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
> -----------------------------------------------------------------------------------
>
> Key: ZOOKEEPER-4711
> URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711
> Project: ZooKeeper
> Issue Type: Bug
> Components: server
> Affects Versions: 3.9.0
> Environment: download zookeeper 3.9.0-SNAPSHOT from github repository
> ([https://github.com/apache/zookeeper)]
> Then run : mvn test -Dmaven.test.failure.ignore=true
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
> Reporter: lujie
> Priority: Critical
>
> When we run :
> mvn test -Dmaven.test.failure.ignore=true
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
> The following method in class :
> org.apache.zookeeper.server.watch.WatcherCleaner
> {code:java}
> public void addDeadWatcher(int watcherBit) {
> // Wait if there are too many watchers waiting to be closed,
> // this is will slow down the socket packet processing and
> // the adding watches in the ZK pipeline.
> while (maxInProcessingDeadWatchers > 0 && !stopped &&
> totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
> try {
> RATE_LOGGER.rateLimitLog("Waiting for dead watchers
> cleaning");
> long startTime = Time.currentElapsedTime();
> synchronized (processingCompletedEvent) {
> processingCompletedEvent.wait(100);
> }
> long latency = Time.currentElapsedTime() - startTime;
>
> ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
> } catch (InterruptedException e) {
> LOG.info("Got interrupted while waiting for dead watches
> queue size");
> break;
> }
> }
> synchronized (this) {
>
> if (deadWatchers.add(watcherBit)) {
> totalDeadWatchers.incrementAndGet();
> ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
> if (deadWatchers.size() >= watcherCleanThreshold) {
> synchronized (cleanEvent) {
> cleanEvent.notifyAll();
> }
> }
> }
> }
> }{code}
> {code:java}
> @Override
> public void run() {
> while (!stopped) {
> synchronized (cleanEvent) {
> try {
> // add some jitter to avoid cleaning dead watchers at the
> // same time in the quorum
> if (!stopped && deadWatchers.size() <
> watcherCleanThreshold) {
>
> int maxWaitMs = (watcherCleanIntervalInSeconds
> +
> ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) *
> 1000;
> cleanEvent.wait(maxWaitMs);
> }
> } catch (InterruptedException e) {
> LOG.info("Received InterruptedException while waiting for
> cleanEvent");
> break;
> }
> } if (deadWatchers.isEmpty()) {
> continue;
> } synchronized (this) {
> // Clean the dead watchers need to go through all the current
> // watches, which is pretty heavy and may take a second if
> // there are millions of watches, that's why we're doing
> lazily
> // batch clean up in a separate thread with a snapshot of the
> // current dead watchers.
> final Set<Integer> snapshot = new HashSet<>(deadWatchers);
> deadWatchers.clear();
> int total = snapshot.size();
> LOG.info("Processing {} dead watchers", total);
> cleaners.schedule(new WorkRequest() {
> @Override
> public void doWork() throws Exception {
> long startTime = Time.currentElapsedTime();
> listener.processDeadWatchers(snapshot);
> long latency = Time.currentElapsedTime() - startTime;
> LOG.info("Takes {} to process {} watches", latency,
> total);
>
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
>
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
> totalDeadWatchers.addAndGet(-total);
> synchronized (processingCompletedEvent) {
> processingCompletedEvent.notifyAll();
> }
> }
> });
> }
> }
> LOG.info("WatcherCleaner thread exited");
> }{code}
> As we can see, the two methods visist deadWatchers Object by different
> thread. *Thread in run()* is *read* operation on deadWachers and Thread in
> addDeadWatcher is *write* operation on deadWachers. This causes a data race
> without any lock.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)