lujie created ZOOKEEPER-4711:
--------------------------------

             Summary: There is a data race bettween run() and "public void 
addDeadWatcher(int watcherBit)" in 
org.apache.zookeeper.server.watch.WatcherCleaner class when run 
org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers junit test.
                 Key: ZOOKEEPER-4711
                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711
             Project: ZooKeeper
          Issue Type: Bug
          Components: server
    Affects Versions: 3.9.0
         Environment: download zookeeper 3.9.0-SNAPSHOT from github repository 
([https://github.com/apache/zookeeper)]

Then run : mvn test -Dmaven.test.failure.ignore=true 
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
-DfailIfNoTests=false -DredirectTestOutputToFile=false
            Reporter: lujie


When we run :

mvn test -Dmaven.test.failure.ignore=true 
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
-DfailIfNoTests=false -DredirectTestOutputToFile=false

The method of addDeadWatcher

(
            System.out.println("2s::" +Thread.currentThread().getName()+ "  
"+System.identityHashCode(deadWatchers)+"  " + System.currentTimeMillis());
this is my debug info.
)
{code:java}
public void addDeadWatcher(int watcherBit) {
        // Wait if there are too many watchers waiting to be closed,
        // this is will slow down the socket packet processing and
        // the adding watches in the ZK pipeline.
        while (maxInProcessingDeadWatchers > 0 && !stopped && 
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
            try {
                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
                long startTime = Time.currentElapsedTime();
                synchronized (processingCompletedEvent) {
                    processingCompletedEvent.wait(100);
                }
                long latency = Time.currentElapsedTime() - startTime;
                
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
            } catch (InterruptedException e) {
                LOG.info("Got interrupted while waiting for dead watches queue 
size");
                break;
            }
        }
        synchronized (this) {
            
            if (deadWatchers.add(watcherBit)) {
                totalDeadWatchers.incrementAndGet();
                ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
                if (deadWatchers.size() >= watcherCleanThreshold) {
                    synchronized (cleanEvent) {
                        cleanEvent.notifyAll();
                    }
                }
            }

        }
    }{code}
 
{code:java}
@Override
    public void run() {
        while (!stopped) {
            synchronized (cleanEvent) {
                try {
                    // add some jitter to avoid cleaning dead watchers at the
                    // same time in the quorum
                    if (!stopped && deadWatchers.size() < 
watcherCleanThreshold) {
                        
                        int maxWaitMs = (watcherCleanIntervalInSeconds
                                         + 
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 
1000;
                        cleanEvent.wait(maxWaitMs);
                    }
                } catch (InterruptedException e) {
                    LOG.info("Received InterruptedException while waiting for 
cleanEvent");
                    break;
                }
            }            if (deadWatchers.isEmpty()) {
                continue;
            }            synchronized (this) {
                // Clean the dead watchers need to go through all the current
                // watches, which is pretty heavy and may take a second if
                // there are millions of watches, that's why we're doing lazily
                // batch clean up in a separate thread with a snapshot of the
                // current dead watchers.
                final Set<Integer> snapshot = new HashSet<>(deadWatchers);
                deadWatchers.clear();
                int total = snapshot.size();
                LOG.info("Processing {} dead watchers", total);
                cleaners.schedule(new WorkRequest() {
                    @Override
                    public void doWork() throws Exception {
                        long startTime = Time.currentElapsedTime();
                        listener.processDeadWatchers(snapshot);
                        long latency = Time.currentElapsedTime() - startTime;
                        LOG.info("Takes {} to process {} watches", latency, 
total);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
                        totalDeadWatchers.addAndGet(-total);
                        synchronized (processingCompletedEvent) {
                            processingCompletedEvent.notifyAll();
                        }
                    }
                });
            }
        }
        LOG.info("WatcherCleaner thread exited");
    }{code}
As we can see, the two methods visist deadWatchers Object by different thread. 
*Thread in run()* is *read* operation on deadWachers and Thread in 
addDeadWatcher is *write* operation on deadWachers. This causes a data race 
without any lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to