NissimShiman commented on code in PR #6414:
URL: https://github.com/apache/nifi/pull/6414#discussion_r972262766


##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java:
##########
@@ -157,4 +170,12 @@ public void channelReadComplete(final 
ChannelHandlerContext ctx) {
             promiseHandshakeComplete.setSuccess();
         }
     }
+
+    public boolean isSuccess() {

Review Comment:
   Nitpick, but add javadoc here (and for next method as well) 



##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java:
##########
@@ -64,24 +65,36 @@ public class CacheClientHandshakeHandler extends 
ChannelInboundHandlerAdapter {
      */
     private final VersionNegotiator versionNegotiator;
 
+    /**
+     * THe network timeout associated with handshake completion
+     */
+    private final long timeoutMillis;
+
     /**
      * Constructor.
      *
      * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
      * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     * @param timeoutMillis     the network timeout associated with handshake 
completion
      */
-    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator,
+                                       final long timeoutMillis) {
         this.promiseHandshakeComplete = channel.newPromise();
         this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
         this.versionNegotiator = versionNegotiator;
+        this.timeoutMillis = timeoutMillis;
     }
 
     /**
      * API providing client application with visibility into the handshake 
process.  Distributed cache requests
      * should not be sent using this {@link Channel} until the handshake is 
complete.
      */
     public void waitHandshakeComplete() {

Review Comment:
   maybe add that isSuccess() should be called following completion of this 
method (i.e. it is only safe to continue processing ifSuccess() is true)



##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -312,6 +355,27 @@ private DistributedMapCacheClientService 
createClient(final int port) throws Ini
         return client;
     }
 
+    private Thread startServerSocket(ServerSocketChannel serverSocketChannel) {
+        final Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    final SocketChannel socketChannel;

Review Comment:
   I don't think we need this line.
   line in try could be:
   serverSocketChannel.accept();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to