[ https://issues.apache.org/jira/browse/ZOOKEEPER-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lujie updated ZOOKEEPER-4711: ----------------------------- Description: When we run : mvn test -Dmaven.test.failure.ignore=true -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers -DfailIfNoTests=false -DredirectTestOutputToFile=false The following method in class : org.apache.zookeeper.server.watch.WatcherCleaner {code:java} public void addDeadWatcher(int watcherBit) { // Wait if there are too many watchers waiting to be closed, // this is will slow down the socket packet processing and // the adding watches in the ZK pipeline. while (maxInProcessingDeadWatchers > 0 && !stopped && totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { try { RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning"); long startTime = Time.currentElapsedTime(); synchronized (processingCompletedEvent) { processingCompletedEvent.wait(100); } long latency = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency); } catch (InterruptedException e) { LOG.info("Got interrupted while waiting for dead watches queue size"); break; } } synchronized (this) { if (deadWatchers.add(watcherBit)) { totalDeadWatchers.incrementAndGet(); ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1); if (deadWatchers.size() >= watcherCleanThreshold) { synchronized (cleanEvent) { cleanEvent.notifyAll(); } } } } }{code} {code:java} @Override public void run() { while (!stopped) { synchronized (cleanEvent) { try { // add some jitter to avoid cleaning dead watchers at the // same time in the quorum if (!stopped && deadWatchers.size() < watcherCleanThreshold) { int maxWaitMs = (watcherCleanIntervalInSeconds + ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; cleanEvent.wait(maxWaitMs); } } catch (InterruptedException e) { LOG.info("Received InterruptedException while waiting for cleanEvent"); break; } } if (deadWatchers.isEmpty()) { continue; } synchronized (this) { // Clean the dead watchers need to go through all the current // watches, which is pretty heavy and may take a second if // there are millions of watches, that's why we're doing lazily // batch clean up in a separate thread with a snapshot of the // current dead watchers. final Set<Integer> snapshot = new HashSet<>(deadWatchers); deadWatchers.clear(); int total = snapshot.size(); LOG.info("Processing {} dead watchers", total); cleaners.schedule(new WorkRequest() { @Override public void doWork() throws Exception { long startTime = Time.currentElapsedTime(); listener.processDeadWatchers(snapshot); long latency = Time.currentElapsedTime() - startTime; LOG.info("Takes {} to process {} watches", latency, total); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total); totalDeadWatchers.addAndGet(-total); synchronized (processingCompletedEvent) { processingCompletedEvent.notifyAll(); } } }); } } LOG.info("WatcherCleaner thread exited"); }{code} As we can see, the two methods visist deadWatchers Object by different thread. *Thread in run()* is *read* operation on deadWachers and Thread in addDeadWatcher is *write* operation on deadWachers. This causes a data race without any lock. was: When we run : mvn test -Dmaven.test.failure.ignore=true -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers -DfailIfNoTests=false -DredirectTestOutputToFile=false The method of addDeadWatcher ( System.out.println("2s::" +Thread.currentThread().getName()+ " "+System.identityHashCode(deadWatchers)+" " + System.currentTimeMillis()); this is my debug info. ) {code:java} public void addDeadWatcher(int watcherBit) { // Wait if there are too many watchers waiting to be closed, // this is will slow down the socket packet processing and // the adding watches in the ZK pipeline. while (maxInProcessingDeadWatchers > 0 && !stopped && totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { try { RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning"); long startTime = Time.currentElapsedTime(); synchronized (processingCompletedEvent) { processingCompletedEvent.wait(100); } long latency = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency); } catch (InterruptedException e) { LOG.info("Got interrupted while waiting for dead watches queue size"); break; } } synchronized (this) { if (deadWatchers.add(watcherBit)) { totalDeadWatchers.incrementAndGet(); ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1); if (deadWatchers.size() >= watcherCleanThreshold) { synchronized (cleanEvent) { cleanEvent.notifyAll(); } } } } }{code} {code:java} @Override public void run() { while (!stopped) { synchronized (cleanEvent) { try { // add some jitter to avoid cleaning dead watchers at the // same time in the quorum if (!stopped && deadWatchers.size() < watcherCleanThreshold) { int maxWaitMs = (watcherCleanIntervalInSeconds + ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; cleanEvent.wait(maxWaitMs); } } catch (InterruptedException e) { LOG.info("Received InterruptedException while waiting for cleanEvent"); break; } } if (deadWatchers.isEmpty()) { continue; } synchronized (this) { // Clean the dead watchers need to go through all the current // watches, which is pretty heavy and may take a second if // there are millions of watches, that's why we're doing lazily // batch clean up in a separate thread with a snapshot of the // current dead watchers. final Set<Integer> snapshot = new HashSet<>(deadWatchers); deadWatchers.clear(); int total = snapshot.size(); LOG.info("Processing {} dead watchers", total); cleaners.schedule(new WorkRequest() { @Override public void doWork() throws Exception { long startTime = Time.currentElapsedTime(); listener.processDeadWatchers(snapshot); long latency = Time.currentElapsedTime() - startTime; LOG.info("Takes {} to process {} watches", latency, total); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency); ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total); totalDeadWatchers.addAndGet(-total); synchronized (processingCompletedEvent) { processingCompletedEvent.notifyAll(); } } }); } } LOG.info("WatcherCleaner thread exited"); }{code} As we can see, the two methods visist deadWatchers Object by different thread. *Thread in run()* is *read* operation on deadWachers and Thread in addDeadWatcher is *write* operation on deadWachers. This causes a data race without any lock. > a data race in > org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers > ----------------------------------------------------------------------------------- > > Key: ZOOKEEPER-4711 > URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711 > Project: ZooKeeper > Issue Type: Bug > Components: server > Affects Versions: 3.9.0 > Environment: download zookeeper 3.9.0-SNAPSHOT from github repository > ([https://github.com/apache/zookeeper)] > Then run : mvn test -Dmaven.test.failure.ignore=true > -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers > -DfailIfNoTests=false -DredirectTestOutputToFile=false > Reporter: lujie > Priority: Critical > > When we run : > mvn test -Dmaven.test.failure.ignore=true > -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers > -DfailIfNoTests=false -DredirectTestOutputToFile=false > The following method in class : > org.apache.zookeeper.server.watch.WatcherCleaner > {code:java} > public void addDeadWatcher(int watcherBit) { > // Wait if there are too many watchers waiting to be closed, > // this is will slow down the socket packet processing and > // the adding watches in the ZK pipeline. > while (maxInProcessingDeadWatchers > 0 && !stopped && > totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { > try { > RATE_LOGGER.rateLimitLog("Waiting for dead watchers > cleaning"); > long startTime = Time.currentElapsedTime(); > synchronized (processingCompletedEvent) { > processingCompletedEvent.wait(100); > } > long latency = Time.currentElapsedTime() - startTime; > > ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency); > } catch (InterruptedException e) { > LOG.info("Got interrupted while waiting for dead watches > queue size"); > break; > } > } > synchronized (this) { > > if (deadWatchers.add(watcherBit)) { > totalDeadWatchers.incrementAndGet(); > ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1); > if (deadWatchers.size() >= watcherCleanThreshold) { > synchronized (cleanEvent) { > cleanEvent.notifyAll(); > } > } > } > } > }{code} > {code:java} > @Override > public void run() { > while (!stopped) { > synchronized (cleanEvent) { > try { > // add some jitter to avoid cleaning dead watchers at the > // same time in the quorum > if (!stopped && deadWatchers.size() < > watcherCleanThreshold) { > > int maxWaitMs = (watcherCleanIntervalInSeconds > + > ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * > 1000; > cleanEvent.wait(maxWaitMs); > } > } catch (InterruptedException e) { > LOG.info("Received InterruptedException while waiting for > cleanEvent"); > break; > } > } if (deadWatchers.isEmpty()) { > continue; > } synchronized (this) { > // Clean the dead watchers need to go through all the current > // watches, which is pretty heavy and may take a second if > // there are millions of watches, that's why we're doing > lazily > // batch clean up in a separate thread with a snapshot of the > // current dead watchers. > final Set<Integer> snapshot = new HashSet<>(deadWatchers); > deadWatchers.clear(); > int total = snapshot.size(); > LOG.info("Processing {} dead watchers", total); > cleaners.schedule(new WorkRequest() { > @Override > public void doWork() throws Exception { > long startTime = Time.currentElapsedTime(); > listener.processDeadWatchers(snapshot); > long latency = Time.currentElapsedTime() - startTime; > LOG.info("Takes {} to process {} watches", latency, > total); > > ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency); > > ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total); > totalDeadWatchers.addAndGet(-total); > synchronized (processingCompletedEvent) { > processingCompletedEvent.notifyAll(); > } > } > }); > } > } > LOG.info("WatcherCleaner thread exited"); > }{code} > As we can see, the two methods visist deadWatchers Object by different > thread. *Thread in run()* is *read* operation on deadWachers and Thread in > addDeadWatcher is *write* operation on deadWachers. This causes a data race > without any lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)