This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d11320d RATIS-706. Dead lock in GrpcClientRpc. Contributed by Tsz Wo
Nicholas Sze.
d11320d is described below
commit d11320db65ce26b40994959c8a72c8f69556ad81
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Oct 17 23:56:07 2019 +0530
RATIS-706. Dead lock in GrpcClientRpc. Contributed by Tsz Wo Nicholas Sze.
---
.../java/org/apache/ratis/util/PeerProxyMap.java | 47 ++++++++-----
.../src/test/java/org/apache/ratis/BaseTest.java | 16 ++++-
.../org/apache/ratis/util/TestPeerProxyMap.java | 80 ++++++++++++++++++++++
3 files changed, 126 insertions(+), 17 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 514c37e..ea0c2f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/** A map from peer id to peer and its proxy. */
@@ -35,7 +36,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements
Closeable {
public static final Logger LOG = LoggerFactory.getLogger(PeerProxyMap.class);
/** Peer and its proxy. */
- private class PeerAndProxy implements Closeable {
+ private class PeerAndProxy {
private final RaftPeer peer;
private volatile PROXY proxy = null;
private final LifeCycle lifeCycle;
@@ -65,19 +66,18 @@ public class PeerProxyMap<PROXY extends Closeable>
implements Closeable {
return proxy;
}
+ Optional<PROXY> setNullProxyAndClose() {
+ final PROXY p;
+ synchronized (this) {
+ p = proxy;
+ lifeCycle.checkStateAndClose(() -> proxy = null);
+ }
+ return Optional.ofNullable(p);
+ }
+
@Override
- public synchronized void close() {
- lifeCycle.checkStateAndClose(() -> {
- if (proxy != null) {
- try {
- LOG.debug("{}: Closing proxy for peer {}", name, peer);
- proxy.close();
- } catch (IOException e) {
- LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
- name, peer, proxy.getClass(), e);
- }
- }
- });
+ public String toString() {
+ return peer.toString();
}
}
@@ -120,14 +120,18 @@ public class PeerProxyMap<PROXY extends Closeable>
implements Closeable {
public void resetProxy(RaftPeerId id) {
LOG.debug("{}: reset proxy for {}", name, id );
+ final PeerAndProxy pp;
+ Optional<PROXY> optional = Optional.empty();
synchronized (resetLock) {
- final PeerAndProxy pp = peers.remove(id);
+ pp = peers.remove(id);
if (pp != null) {
final RaftPeer peer = pp.getPeer();
- pp.close();
+ optional = pp.setNullProxyAndClose();
computeIfAbsent(peer);
}
}
+ // close proxy without holding the reset lock
+ optional.ifPresent(proxy -> closeProxy(proxy, pp));
}
/** @return true if the given throwable is handled; otherwise, the call is
an no-op, return false. */
@@ -145,6 +149,17 @@ public class PeerProxyMap<PROXY extends Closeable>
implements Closeable {
@Override
public void close() {
- peers.values().forEach(PeerAndProxy::close);
+ peers.values().parallelStream().forEach(
+ pp -> pp.setNullProxyAndClose().ifPresent(proxy -> closeProxy(proxy,
pp)));
+ }
+
+ private void closeProxy(PROXY proxy, PeerAndProxy pp) {
+ try {
+ LOG.debug("{}: Closing proxy for peer {}", name, pp);
+ proxy.close();
+ } catch (IOException e) {
+ LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
+ name, pp, proxy.getClass(), e);
+ }
}
}
\ No newline at end of file
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 6598ba5..5edbf38 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public abstract class BaseTest {
@@ -56,8 +57,21 @@ public abstract class BaseTest {
ExitUtils.disableSystemExit();
}
+ private final AtomicReference<Throwable> firstException = new
AtomicReference<>();
+
+ public void setFirstException(Throwable e) {
+ if (firstException.compareAndSet(null, e)) {
+ LOG.error("Set firstException", e);
+ }
+ }
+
@After
- public void assertNotTerminated() {
+ public void assertNoFailures() {
+ final Throwable e = firstException.get();
+ if (e != null) {
+ throw new IllegalStateException("Failed: first exception was set", e);
+ }
+
ExitUtils.assertNotTerminated();
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java
new file mode 100644
index 0000000..e0e1c49
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Closeable;
+
+/** Tests for {@link PeerProxyMap}. */
+public class TestPeerProxyMap extends BaseTest {
+ class DummyProxy implements Closeable {
+ private final RaftPeer peer;
+
+ DummyProxy(RaftPeer peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public void close() {
+ LOG.info("{}: close before lock", this);
+ synchronized(this) {
+ LOG.info("{}: close in lock", this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return peer.toString();
+ }
+ }
+
+ @Test(timeout = 10_000)
+ public void testCloseDeadLock() throws Exception {
+ final PeerProxyMap<DummyProxy> map = new PeerProxyMap<>("test",
DummyProxy::new);
+ final RaftPeerId id = RaftPeerId.valueOf("s0");
+ final RaftPeer peer = new RaftPeer(id);
+ map.computeIfAbsent(peer);
+
+ final DummyProxy proxy = map.getProxy(id);
+
+ final Thread t = new Thread(() -> {
+ // hold proxy lock and then getProxy(..) which requires resetLock
+ synchronized (proxy) {
+ LOG.info("Acquired lock");
+ try {
+ HUNDRED_MILLIS.sleep();
+ LOG.info("Try getProxy");
+ final DummyProxy newProxy = map.getProxy(id);
+ Assert.assertNotSame(proxy, newProxy);
+ } catch (Exception e) {
+ setFirstException(e);
+ }
+ LOG.info("Will release lock");
+ }
+ });
+ t.start();
+
+ map.resetProxy(id); // hold resetLock and then call close() with requires
proxy lock
+ t.join();
+ }
+}