This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4906c8b2 RATIS-1567. NettyServerStreamRpc may throw NPE. (#634)
4906c8b2 is described below
commit 4906c8b296aa8cc9a67c0736ff48a038de1a0393
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Apr 26 15:56:26 2022 +0800
RATIS-1567. NettyServerStreamRpc may throw NPE. (#634)
---
.../java/org/apache/ratis/util/ConcurrentUtils.java | 16 ++++++++++++++--
.../main/java/org/apache/ratis/util/PeerProxyMap.java | 13 +++++++++++--
.../main/java/org/apache/ratis/util/TimeDuration.java | 1 +
.../ratis/netty/server/NettyServerStreamRpc.java | 18 +++++++++---------
4 files changed, 35 insertions(+), 13 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index da61c9a9..d0c87288 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -116,11 +116,23 @@ public interface ConcurrentUtils {
* @param executor The executor to be shut down.
*/
static void shutdownAndWait(ExecutorService executor) {
+ shutdownAndWait(TimeDuration.ONE_DAY, executor, timeout -> {
+ throw new IllegalStateException(executor.getClass().getName() + "
shutdown timeout in " + timeout);
+ });
+ }
+
+ static void shutdownAndWait(TimeDuration waitTime, ExecutorService executor,
Consumer<TimeDuration> timoutHandler) {
+ executor.shutdown();
try {
- executor.shutdown();
- Preconditions.assertTrue(executor.awaitTermination(1, TimeUnit.DAYS));
+ if (executor.awaitTermination(waitTime.getDuration(),
waitTime.getUnit())) {
+ return;
+ }
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
+ return;
+ }
+ if (timoutHandler != null) {
+ timoutHandler.accept(waitTime);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 4e15a400..d9f0107a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -21,6 +21,7 @@ import
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.function.CheckedFunction;
+import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,8 +122,16 @@ public class PeerProxyMap<PROXY extends Closeable>
implements RaftPeer.Add, Clos
}
}
- public void computeIfAbsent(RaftPeer p) {
- peers.computeIfAbsent(p.getId(), k -> new PeerAndProxy(p));
+ /**
+ * This method is similar to {@link Map#computeIfAbsent(Object,
java.util.function.Function)}
+ * except that this method does not require a mapping function.
+ *
+ * @param peer the peer for retrieving/building a proxy.
+ * @return a supplier of the proxy for the given peer.
+ */
+ public CheckedSupplier<PROXY, IOException> computeIfAbsent(RaftPeer peer) {
+ final PeerAndProxy peerAndProxy = peers.computeIfAbsent(peer.getId(), k ->
new PeerAndProxy(peer));
+ return peerAndProxy::getProxy;
}
public void resetProxy(RaftPeerId id) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 0a9523c2..58a61cc2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -42,6 +42,7 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
public static final TimeDuration ONE_MILLISECOND = TimeDuration.valueOf(1,
TimeUnit.MILLISECONDS);
public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
public static final TimeDuration ONE_MINUTE = TimeDuration.valueOf(1,
TimeUnit.MINUTES);
+ public static final TimeDuration ONE_DAY = TimeDuration.valueOf(1,
TimeUnit.DAYS);
static final double ERROR_THRESHOLD = 0.001; // accept 0.1% error
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 03287006..4f862928 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -52,9 +52,11 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
+import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +102,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add((DataStreamOutputRpc)
map.getProxy(peer.getId()).stream(request));
+ outs.add((DataStreamOutputRpc)
map.computeIfAbsent(peer).get().stream(request));
} catch (IOException e) {
map.handleException(peer.getId(), e, true);
throw new IOException(map.getName() + ": Failed to
getDataStreamOutput for " + peer, e);
@@ -164,9 +166,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void addRaftPeers(Collection<RaftPeer> newPeers) {
- for (int i = 0; i < proxies.size(); i ++) {
- proxies.get(i).addPeers(newPeers);
- }
+ proxies.forEach(proxy -> proxy.addPeers(newPeers));
}
static class RequestRef {
@@ -272,15 +272,15 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
channelFuture.channel().close().sync();
bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
- bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
- workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, bossGroup,
+ timeout -> LOG.warn("{}: bossGroup shutdown timeout in " + timeout,
this));
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, workerGroup,
+ timeout -> LOG.warn("{}: workerGroup shutdown timeout in " +
timeout, this));
} catch (InterruptedException e) {
LOG.error(this + ": Interrupted close()", e);
}
- for (int i = 0; i < proxies.size(); i ++) {
- proxies.get(i).close();
- }
+ proxies.forEach(Proxies::close);
}
@Override