[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lujie updated ZOOKEEPER-4711:
-----------------------------
    Description: 
When we run :

mvn test -Dmaven.test.failure.ignore=true 
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
-DfailIfNoTests=false -DredirectTestOutputToFile=false

The following method in class : org.apache.zookeeper.server.watch.WatcherCleaner
{code:java}
public void addDeadWatcher(int watcherBit) {
        // Wait if there are too many watchers waiting to be closed,
        // this is will slow down the socket packet processing and
        // the adding watches in the ZK pipeline.
        while (maxInProcessingDeadWatchers > 0 && !stopped && 
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
            try {
                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
                long startTime = Time.currentElapsedTime();
                synchronized (processingCompletedEvent) {
                    processingCompletedEvent.wait(100);
                }
                long latency = Time.currentElapsedTime() - startTime;
                
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
            } catch (InterruptedException e) {
                LOG.info("Got interrupted while waiting for dead watches queue 
size");
                break;
            }
        }
        synchronized (this) {
            
            if (deadWatchers.add(watcherBit)) {
                totalDeadWatchers.incrementAndGet();
                ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
                if (deadWatchers.size() >= watcherCleanThreshold) {
                    synchronized (cleanEvent) {
                        cleanEvent.notifyAll();
                    }
                }
            }

        }
    }{code}
{code:java}
@Override
    public void run() {
        while (!stopped) {
            synchronized (cleanEvent) {
                try {
                    // add some jitter to avoid cleaning dead watchers at the
                    // same time in the quorum
                    if (!stopped && deadWatchers.size() < 
watcherCleanThreshold) {
                        
                        int maxWaitMs = (watcherCleanIntervalInSeconds
                                         + 
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 
1000;
                        cleanEvent.wait(maxWaitMs);
                    }
                } catch (InterruptedException e) {
                    LOG.info("Received InterruptedException while waiting for 
cleanEvent");
                    break;
                }
            }            if (deadWatchers.isEmpty()) {
                continue;
            }            synchronized (this) {
                // Clean the dead watchers need to go through all the current
                // watches, which is pretty heavy and may take a second if
                // there are millions of watches, that's why we're doing lazily
                // batch clean up in a separate thread with a snapshot of the
                // current dead watchers.
                final Set<Integer> snapshot = new HashSet<>(deadWatchers);
                deadWatchers.clear();
                int total = snapshot.size();
                LOG.info("Processing {} dead watchers", total);
                cleaners.schedule(new WorkRequest() {
                    @Override
                    public void doWork() throws Exception {
                        long startTime = Time.currentElapsedTime();
                        listener.processDeadWatchers(snapshot);
                        long latency = Time.currentElapsedTime() - startTime;
                        LOG.info("Takes {} to process {} watches", latency, 
total);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
                        totalDeadWatchers.addAndGet(-total);
                        synchronized (processingCompletedEvent) {
                            processingCompletedEvent.notifyAll();
                        }
                    }
                });
            }
        }
        LOG.info("WatcherCleaner thread exited");
    }{code}
As we can see, the two methods visist deadWatchers Object by different thread. 
*Thread in run()* is *read* operation on deadWachers and Thread in 
addDeadWatcher is *write* operation on deadWachers. This causes a data race 
without any lock.

  was:
When we run :

mvn test -Dmaven.test.failure.ignore=true 
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
-DfailIfNoTests=false -DredirectTestOutputToFile=false

The method of addDeadWatcher

(
            System.out.println("2s::" +Thread.currentThread().getName()+ "  
"+System.identityHashCode(deadWatchers)+"  " + System.currentTimeMillis());
this is my debug info.
)
{code:java}
public void addDeadWatcher(int watcherBit) {
        // Wait if there are too many watchers waiting to be closed,
        // this is will slow down the socket packet processing and
        // the adding watches in the ZK pipeline.
        while (maxInProcessingDeadWatchers > 0 && !stopped && 
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
            try {
                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
                long startTime = Time.currentElapsedTime();
                synchronized (processingCompletedEvent) {
                    processingCompletedEvent.wait(100);
                }
                long latency = Time.currentElapsedTime() - startTime;
                
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
            } catch (InterruptedException e) {
                LOG.info("Got interrupted while waiting for dead watches queue 
size");
                break;
            }
        }
        synchronized (this) {
            
            if (deadWatchers.add(watcherBit)) {
                totalDeadWatchers.incrementAndGet();
                ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
                if (deadWatchers.size() >= watcherCleanThreshold) {
                    synchronized (cleanEvent) {
                        cleanEvent.notifyAll();
                    }
                }
            }

        }
    }{code}
 
{code:java}
@Override
    public void run() {
        while (!stopped) {
            synchronized (cleanEvent) {
                try {
                    // add some jitter to avoid cleaning dead watchers at the
                    // same time in the quorum
                    if (!stopped && deadWatchers.size() < 
watcherCleanThreshold) {
                        
                        int maxWaitMs = (watcherCleanIntervalInSeconds
                                         + 
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 
1000;
                        cleanEvent.wait(maxWaitMs);
                    }
                } catch (InterruptedException e) {
                    LOG.info("Received InterruptedException while waiting for 
cleanEvent");
                    break;
                }
            }            if (deadWatchers.isEmpty()) {
                continue;
            }            synchronized (this) {
                // Clean the dead watchers need to go through all the current
                // watches, which is pretty heavy and may take a second if
                // there are millions of watches, that's why we're doing lazily
                // batch clean up in a separate thread with a snapshot of the
                // current dead watchers.
                final Set<Integer> snapshot = new HashSet<>(deadWatchers);
                deadWatchers.clear();
                int total = snapshot.size();
                LOG.info("Processing {} dead watchers", total);
                cleaners.schedule(new WorkRequest() {
                    @Override
                    public void doWork() throws Exception {
                        long startTime = Time.currentElapsedTime();
                        listener.processDeadWatchers(snapshot);
                        long latency = Time.currentElapsedTime() - startTime;
                        LOG.info("Takes {} to process {} watches", latency, 
total);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
                        
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
                        totalDeadWatchers.addAndGet(-total);
                        synchronized (processingCompletedEvent) {
                            processingCompletedEvent.notifyAll();
                        }
                    }
                });
            }
        }
        LOG.info("WatcherCleaner thread exited");
    }{code}
As we can see, the two methods visist deadWatchers Object by different thread. 
*Thread in run()* is *read* operation on deadWachers and Thread in 
addDeadWatcher is *write* operation on deadWachers. This causes a data race 
without any lock.


> a data race in 
> org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
> -----------------------------------------------------------------------------------
>
>                 Key: ZOOKEEPER-4711
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: server
>    Affects Versions: 3.9.0
>         Environment: download zookeeper 3.9.0-SNAPSHOT from github repository 
> ([https://github.com/apache/zookeeper)]
> Then run : mvn test -Dmaven.test.failure.ignore=true 
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
>            Reporter: lujie
>            Priority: Critical
>
> When we run :
> mvn test -Dmaven.test.failure.ignore=true 
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
> The following method in class : 
> org.apache.zookeeper.server.watch.WatcherCleaner
> {code:java}
> public void addDeadWatcher(int watcherBit) {
>         // Wait if there are too many watchers waiting to be closed,
>         // this is will slow down the socket packet processing and
>         // the adding watches in the ZK pipeline.
>         while (maxInProcessingDeadWatchers > 0 && !stopped && 
> totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
>             try {
>                 RATE_LOGGER.rateLimitLog("Waiting for dead watchers 
> cleaning");
>                 long startTime = Time.currentElapsedTime();
>                 synchronized (processingCompletedEvent) {
>                     processingCompletedEvent.wait(100);
>                 }
>                 long latency = Time.currentElapsedTime() - startTime;
>                 
> ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
>             } catch (InterruptedException e) {
>                 LOG.info("Got interrupted while waiting for dead watches 
> queue size");
>                 break;
>             }
>         }
>         synchronized (this) {
>             
>             if (deadWatchers.add(watcherBit)) {
>                 totalDeadWatchers.incrementAndGet();
>                 ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
>                 if (deadWatchers.size() >= watcherCleanThreshold) {
>                     synchronized (cleanEvent) {
>                         cleanEvent.notifyAll();
>                     }
>                 }
>             }
>         }
>     }{code}
> {code:java}
> @Override
>     public void run() {
>         while (!stopped) {
>             synchronized (cleanEvent) {
>                 try {
>                     // add some jitter to avoid cleaning dead watchers at the
>                     // same time in the quorum
>                     if (!stopped && deadWatchers.size() < 
> watcherCleanThreshold) {
>                         
>                         int maxWaitMs = (watcherCleanIntervalInSeconds
>                                          + 
> ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 
> 1000;
>                         cleanEvent.wait(maxWaitMs);
>                     }
>                 } catch (InterruptedException e) {
>                     LOG.info("Received InterruptedException while waiting for 
> cleanEvent");
>                     break;
>                 }
>             }            if (deadWatchers.isEmpty()) {
>                 continue;
>             }            synchronized (this) {
>                 // Clean the dead watchers need to go through all the current
>                 // watches, which is pretty heavy and may take a second if
>                 // there are millions of watches, that's why we're doing 
> lazily
>                 // batch clean up in a separate thread with a snapshot of the
>                 // current dead watchers.
>                 final Set<Integer> snapshot = new HashSet<>(deadWatchers);
>                 deadWatchers.clear();
>                 int total = snapshot.size();
>                 LOG.info("Processing {} dead watchers", total);
>                 cleaners.schedule(new WorkRequest() {
>                     @Override
>                     public void doWork() throws Exception {
>                         long startTime = Time.currentElapsedTime();
>                         listener.processDeadWatchers(snapshot);
>                         long latency = Time.currentElapsedTime() - startTime;
>                         LOG.info("Takes {} to process {} watches", latency, 
> total);
>                         
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
>                         
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
>                         totalDeadWatchers.addAndGet(-total);
>                         synchronized (processingCompletedEvent) {
>                             processingCompletedEvent.notifyAll();
>                         }
>                     }
>                 });
>             }
>         }
>         LOG.info("WatcherCleaner thread exited");
>     }{code}
> As we can see, the two methods visist deadWatchers Object by different 
> thread. *Thread in run()* is *read* operation on deadWachers and Thread in 
> addDeadWatcher is *write* operation on deadWachers. This causes a data race 
> without any lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to