[
https://issues.apache.org/jira/browse/CURATOR-335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313204#comment-15313204
]
ASF GitHub Bot commented on CURATOR-335:
----------------------------------------
GitHub user Randgalt opened a pull request:
https://github.com/apache/curator/pull/155
[CURATOR-335] InterProcessSemaphoreV2 can deadlock under network stress
If there is a network event after the semaphore's node is created but
before getChildren() is called, the previous implementation would orphan the
newly created node causing a deadlock later on
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/apache/curator CURATOR-335
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/curator/pull/155.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #155
----
----
> InterProcessSemaphoreV2 can deadlock under network stress
> ---------------------------------------------------------
>
> Key: CURATOR-335
> URL: https://issues.apache.org/jira/browse/CURATOR-335
> Project: Apache Curator
> Issue Type: Bug
> Components: Recipes
> Affects Versions: 3.1.0, 2.10.0
> Reporter: Jordan Zimmerman
> Assignee: Jordan Zimmerman
> Priority: Critical
> Fix For: 2.11.0, 3.2.0
>
>
> Under network stress, InterProcessSemaphoreV2 can stop acquiring new leases.
> This test (by [~cammckenzie]) shows the issues :
> {code}
> package org.apache.curator.framework.recipes.locks;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.framework.state.ConnectionStateListener;
> import org.apache.curator.retry.RetryOneTime;
> import org.apache.curator.test.BaseClassForTests;
> import org.apache.curator.utils.CloseableUtils;
> import org.apache.zookeeper.KeeperException;
> import org.testng.Assert;
> import org.testng.ITestContext;
> import org.testng.annotations.BeforeSuite;
> import org.testng.annotations.Test;
> public class TestInterProcessMutexNotReconnecting extends BaseClassForTests
> {
> @Test
> public void test() throws Exception
> {
> final String SEMAPHORE_PATH = "/test";
> final int MAX_SEMAPHORES = 1;
> final int NUM_CLIENTS = 10;
>
> server.start();
>
> CuratorFramework client = null;
> ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
>
> final AtomicInteger counter = new AtomicInteger(0);
> final AtomicBoolean run = new AtomicBoolean(true);
>
> try {
> client =
> CuratorFrameworkFactory.newClient(server.getConnectString(), 5000, 5000, new
> RetryOneTime(1));
> client.start();
>
> final CuratorFramework lClient = client;
>
> for(int i = 0; i < NUM_CLIENTS; ++i)
> {
> executor.execute(new Runnable()
> {
>
> @Override
> public void run()
> {
> while(run.get())
> {
> InterProcessSemaphoreV2 semaphore = new
> InterProcessSemaphoreV2(lClient, SEMAPHORE_PATH, MAX_SEMAPHORES);
> System.err.println(Thread.currentThread() +
> "Acquiring");
> Lease lease = null;
> try
> {
> lease = semaphore.acquire();
> System.err.println(Thread.currentThread() +
> "Acquired");
> counter.incrementAndGet();
> Thread.sleep(2000);
> }
> catch(InterruptedException e)
> {
> System.err.println("Interrupted");
> Thread.currentThread().interrupt();
> break;
> }
> catch(KeeperException e)
> {
> try
> {
> Thread.sleep(2000);
> }
> catch(InterruptedException e2)
> {
> System.err.println("Interrupted");
> Thread.currentThread().interrupt();
> break;
> }
> }
> catch(Exception e)
> {
> e.printStackTrace();
> }
> finally
> {
> if(lease != null) {
> semaphore.returnLease(lease);
> }
> }
> }
> }
> });
> }
>
> final AtomicBoolean lost = new AtomicBoolean(false);
> client.getConnectionStateListenable().addListener(new
> ConnectionStateListener() {
>
> @Override
> public void stateChanged(CuratorFramework client,
> ConnectionState newState) {
> System.err.println("New state : " + newState);
>
> if(newState == ConnectionState.LOST) {
> lost.set(true);
> }
> }
> });
>
> Thread.sleep(2000);
>
> System.err.println("Stopping server");
> server.stop();
> System.err.println("Stopped server");
>
> while(!lost.get())
> {
> Thread.sleep(1000);
> }
>
> int preRestartCount = counter.get();
>
> System.err.println("Restarting server");
> server.restart();
>
> long startCheckTime = System.currentTimeMillis();
> while(true)
> {
> if(counter.get() > preRestartCount)
> {
> break;
> }
> else if((System.currentTimeMillis() - startCheckTime) > 30000)
> {
> Assert.fail("Semaphores not reacquired after restart");
> }
> }
> }
> finally
> {
> run.set(false);
> executor.shutdownNow();
> CloseableUtils.closeQuietly(client);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)