This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4906c8b2 RATIS-1567. NettyServerStreamRpc may throw NPE. (#634)
4906c8b2 is described below

commit 4906c8b296aa8cc9a67c0736ff48a038de1a0393
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Apr 26 15:56:26 2022 +0800

    RATIS-1567. NettyServerStreamRpc may throw NPE. (#634)
---
 .../java/org/apache/ratis/util/ConcurrentUtils.java    | 16 ++++++++++++++--
 .../main/java/org/apache/ratis/util/PeerProxyMap.java  | 13 +++++++++++--
 .../main/java/org/apache/ratis/util/TimeDuration.java  |  1 +
 .../ratis/netty/server/NettyServerStreamRpc.java       | 18 +++++++++---------
 4 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index da61c9a9..d0c87288 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -116,11 +116,23 @@ public interface ConcurrentUtils {
    * @param executor The executor to be shut down.
    */
   static void shutdownAndWait(ExecutorService executor) {
+    shutdownAndWait(TimeDuration.ONE_DAY, executor, timeout -> {
+      throw new IllegalStateException(executor.getClass().getName() + " 
shutdown timeout in " + timeout);
+    });
+  }
+
+  static void shutdownAndWait(TimeDuration waitTime, ExecutorService executor, 
Consumer<TimeDuration> timoutHandler) {
+    executor.shutdown();
     try {
-      executor.shutdown();
-      Preconditions.assertTrue(executor.awaitTermination(1, TimeUnit.DAYS));
+      if (executor.awaitTermination(waitTime.getDuration(), 
waitTime.getUnit())) {
+        return;
+      }
     } catch (InterruptedException ignored) {
       Thread.currentThread().interrupt();
+      return;
+    }
+    if (timoutHandler != null) {
+      timoutHandler.accept(waitTime);
     }
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 4e15a400..d9f0107a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -21,6 +21,7 @@ import 
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.function.CheckedFunction;
+import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,8 +122,16 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements RaftPeer.Add, Clos
     }
   }
 
-  public void computeIfAbsent(RaftPeer p) {
-    peers.computeIfAbsent(p.getId(), k -> new PeerAndProxy(p));
+  /**
+   * This method is similar to {@link Map#computeIfAbsent(Object, 
java.util.function.Function)}
+   * except that this method does not require a mapping function.
+   *
+   * @param peer the peer for retrieving/building a proxy.
+   * @return a supplier of the proxy for the given peer.
+   */
+  public CheckedSupplier<PROXY, IOException> computeIfAbsent(RaftPeer peer) {
+    final PeerAndProxy peerAndProxy = peers.computeIfAbsent(peer.getId(), k -> 
new PeerAndProxy(peer));
+    return peerAndProxy::getProxy;
   }
 
   public void resetProxy(RaftPeerId id) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 0a9523c2..58a61cc2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -42,6 +42,7 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
   public static final TimeDuration ONE_MILLISECOND = TimeDuration.valueOf(1, 
TimeUnit.MILLISECONDS);
   public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
   public static final TimeDuration ONE_MINUTE = TimeDuration.valueOf(1, 
TimeUnit.MINUTES);
+  public static final TimeDuration ONE_DAY = TimeDuration.valueOf(1, 
TimeUnit.DAYS);
 
   static final double ERROR_THRESHOLD = 0.001; // accept 0.1% error
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 03287006..4f862928 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -52,9 +52,11 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
+import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +102,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add((DataStreamOutputRpc) 
map.getProxy(peer.getId()).stream(request));
+          outs.add((DataStreamOutputRpc) 
map.computeIfAbsent(peer).get().stream(request));
         } catch (IOException e) {
           map.handleException(peer.getId(), e, true);
           throw new IOException(map.getName() + ": Failed to 
getDataStreamOutput for " + peer, e);
@@ -164,9 +166,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   @Override
   public void addRaftPeers(Collection<RaftPeer> newPeers) {
-    for (int i = 0; i < proxies.size(); i ++) {
-      proxies.get(i).addPeers(newPeers);
-    }
+    proxies.forEach(proxy -> proxy.addPeers(newPeers));
   }
 
   static class RequestRef {
@@ -272,15 +272,15 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       channelFuture.channel().close().sync();
       bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
       workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
-      bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
-      workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, bossGroup,
+          timeout -> LOG.warn("{}: bossGroup shutdown timeout in " + timeout, 
this));
+      ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, workerGroup,
+          timeout -> LOG.warn("{}: workerGroup shutdown timeout in " + 
timeout, this));
     } catch (InterruptedException e) {
       LOG.error(this + ": Interrupted close()", e);
     }
 
-    for (int i = 0; i < proxies.size(); i ++) {
-      proxies.get(i).close();
-    }
+    proxies.forEach(Proxies::close);
   }
 
   @Override

Reply via email to