NissimShiman commented on code in PR #6414:
URL: https://github.com/apache/nifi/pull/6414#discussion_r972262766
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java:
##########
@@ -157,4 +170,12 @@ public void channelReadComplete(final
ChannelHandlerContext ctx) {
promiseHandshakeComplete.setSuccess();
}
}
+
+ public boolean isSuccess() {
Review Comment:
Nitpick, but add javadoc here (and for next method as well)
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java:
##########
@@ -64,24 +65,36 @@ public class CacheClientHandshakeHandler extends
ChannelInboundHandlerAdapter {
*/
private final VersionNegotiator versionNegotiator;
+ /**
+ * THe network timeout associated with handshake completion
+ */
+ private final long timeoutMillis;
+
/**
* Constructor.
*
* @param channel the channel to which this {@link
io.netty.channel.ChannelHandler} is bound.
* @param versionNegotiator coordinator used to broker the version of the
distributed cache protocol with the service
+ * @param timeoutMillis the network timeout associated with handshake
completion
*/
- public CacheClientHandshakeHandler(final Channel channel, final
VersionNegotiator versionNegotiator) {
+ public CacheClientHandshakeHandler(final Channel channel, final
VersionNegotiator versionNegotiator,
+ final long timeoutMillis) {
this.promiseHandshakeComplete = channel.newPromise();
this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
this.versionNegotiator = versionNegotiator;
+ this.timeoutMillis = timeoutMillis;
}
/**
* API providing client application with visibility into the handshake
process. Distributed cache requests
* should not be sent using this {@link Channel} until the handshake is
complete.
*/
public void waitHandshakeComplete() {
Review Comment:
maybe add that isSuccess() should be called following completion of this
method (i.e. it is only safe to continue processing ifSuccess() is true)
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -312,6 +355,27 @@ private DistributedMapCacheClientService
createClient(final int port) throws Ini
return client;
}
+ private Thread startServerSocket(ServerSocketChannel serverSocketChannel) {
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ final SocketChannel socketChannel;
Review Comment:
I don't think we need this line.
line in try could be:
serverSocketChannel.accept();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]