exceptionfactory commented on a change in pull request #5311:
URL: https://github.com/apache/nifi/pull/5311#discussion_r693181684



##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");
+        try {
+            this.cacheClient.invoke(new OutboundAdapter().write("close"), new 
VoidInboundAdapter());

Review comment:
       It would be helpful to create an enum to define standard operations as 
opposed to using the string operation name such as `close`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);

Review comment:
       It would be helpful to create an enum for these versions instead of 
using the numbers directly. Perhaps something like `CacheProtocolVersion`?

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/InboundToken.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.adapter;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+
+import java.io.IOException;
+
+/**
+ * Tracker for receipt of incoming tokens from an InputStream.
+ *
+ * @param <T> the expected type of the service response token
+ */
+public class InboundToken<T> {
+
+    /**
+     * Flag indicating that all bytes needed to deserialize a token have been 
received.
+     */
+    private boolean isComplete;
+
+    /**
+     * The length of the byte stream expected from the service response.
+     */
+    private Integer length;
+
+    /**
+     * The received service method response value.  Depending on the data type 
and the deserializer
+     * implementation, it is possible to deserialize a null value from a 
stream.
+     */
+    private T value;
+
+    /**
+     * Constructor.
+     */
+    public InboundToken() {
+        reset();
+    }
+
+    /**
+     * Initialize this object to receive bytes from a stream to be 
deserialized.
+     */
+    public void reset() {
+        isComplete = false;
+        length = null;
+        value = null;
+    }
+
+    /**
+     * Update the state of the received bytes to be deserialized.
+     *
+     * @param byteBuf      the intermediate buffer used to cache bytes 
received from the service
+     * @param deserializer the deserializer to be used to construct the 
service response object
+     * @throws IOException on serialization failures
+     */
+    public void update(final ByteBuf byteBuf, final Deserializer<T> 
deserializer) throws IOException {
+        if ((length == null) && (byteBuf.readableBytes() >= 4)) {

Review comment:
       It would be helpful to use a static named variable to indicate the 
reason for checking the value of `4`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");
+        try {
+            this.cacheClient.invoke(new OutboundAdapter().write("close"), new 
VoidInboundAdapter());
+        } catch (IOException e) {
+            logger.warn(e.getMessage());
+        }
+        this.channelPool.close();
+        this.versionNegotiator = null;
+        this.channelPool = null;
+        this.cacheClient = null;
     }
 
     @OnStopped
     public void onStopped() throws IOException {
-        close();
+        if (isEnabled()) {
+            onDisabled();
+        }
     }
 
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("putIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("putIfAbsent")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
-        withCommsSession(new CommsAction<Object>() {
-            @Override
-            public Object execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("put");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final boolean success = dis.readBoolean();
-                if ( !success ) {
-                    throw new IOException("Expected to receive confirmation of 
'put' request but received unexpected response");
-                }
-
-                return null;
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("put")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        if (!inboundAdapter.getResult()) {
+            throw new IOException("Expected to receive confirmation of 'put' 
request but received unexpected response");
+        }
     }
 
     @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("containsKey");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("containsKey")
+                .write(key, keySerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("getAndPutIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("getAndPutIfAbsent")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("get");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("get")
+                .write(key, keySerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(session -> {
-            Map<K, V> response = new HashMap<>(keys.size());
-            try {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("subMap");
-                serialize(keys, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-
-                for (K key : keys) {
-                    final byte[] responseBuffer = 
readLengthDelimitedResponse(dis);
-                    response.put(key, 
valueDeserializer.deserialize(responseBuffer));
-                }
-            } catch (UnsupportedOperationException uoe) {
-                // If the server doesn't support subMap, just emulate it with 
multiple calls to get()
-                for (K key : keys) {
-                    response.put(key, get(key, keySerializer, 
valueDeserializer));
-                }
-            }
-
-            return response;
-        });
+        validateProtocolVersion(3);
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("subMap").write(keys.size());
+        for (K key : keys) {
+            outboundAdapter.write(key, keySerializer);
+        }
+        final MapValuesInboundAdapter<K, V> inboundAdapter =
+                new MapValuesInboundAdapter<>(keys, valueDeserializer, new 
HashMap<>());
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("remove");
-
-                serialize(key, serializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("remove")
+                .write(key, serializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("removeAndGet");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        validateProtocolVersion(3);

Review comment:
       See note above about defining an enum for supported protocol versions.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyChannelInitializer.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Bootstrap a new netty connection.  This performs the socket handshake used 
by the nifi distributed set /
+ * distributed map client controller services.
+ */
+public class NettyChannelInitializer extends ChannelInitializer<Channel> {

Review comment:
       Recommend a more specific name, such as `CacheClientChannelInitializer`.
   ```suggestion
   public class CacheClientChannelInitializer extends 
ChannelInitializer<Channel> {
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");

Review comment:
       Changing this to debug would follow the recommendation for `onEnabled()`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");
+        try {
+            this.cacheClient.invoke(new OutboundAdapter().write("close"), new 
VoidInboundAdapter());
+        } catch (IOException e) {
+            logger.warn(e.getMessage());

Review comment:
       The stack trace should be included for troubleshooting, along with a 
more detailed error message.
   ```suggestion
               logger.warn("Map Cache Client close failed", e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");

Review comment:
       This seems unnecessary at the information level, changing this to debug 
and also logging the context name would be helpful.
   ```suggestion
           logger.debug("Enabling Map Cache Client Service [{}]", 
context.getName());
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyHandshakeHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class NettyHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public NettyHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(0);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.info("Magic header written");

Review comment:
       This should be a debug log.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyChannelPoolFactory.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.SimpleChannelPool;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Factory for construction of new {@link ChannelPool}, used by distributed 
cache clients to invoke service
+ * methods.  Cache clients include the NiFi services {@link 
DistributedSetCacheClientService}
+ * and {@link DistributedMapCacheClientService}.
+ */
+public class NettyChannelPoolFactory {
+
+    /**
+     * Instantiate a new netty pool of channels to be used for distributed 
cache communications
+     *
+     * @param context           the NiFi configuration to be applied to the 
channel pool
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     * @return a channel pool object from which {@link Channel} objects may be 
obtained
+     */
+    public static ChannelPool createChannelPool(final ConfigurationContext 
context, final VersionNegotiator versionNegotiator) {
+        final PropertyValue hostname = 
context.getProperty(DistributedSetCacheClientService.HOSTNAME);
+        final PropertyValue port = 
context.getProperty(DistributedSetCacheClientService.PORT);

Review comment:
       Recommend adjusting these declarations to chain the calls for a `String` 
hostname and an `int` port.
   ```suggestion
           final String hostname = 
context.getProperty(DistributedSetCacheClientService.HOSTNAME).getValue();
           final int port = 
context.getProperty(DistributedSetCacheClientService.PORT).asInteger();
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml
##########
@@ -59,5 +59,11 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>

Review comment:
       Is the `netty-all` dependency necessary, or would one of the other more 
specific dependencies be sufficient, such as `netty-handler`?

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");
+        try {
+            this.cacheClient.invoke(new OutboundAdapter().write("close"), new 
VoidInboundAdapter());
+        } catch (IOException e) {
+            logger.warn(e.getMessage());
+        }
+        this.channelPool.close();
+        this.versionNegotiator = null;
+        this.channelPool = null;
+        this.cacheClient = null;
     }
 
     @OnStopped
     public void onStopped() throws IOException {
-        close();
+        if (isEnabled()) {
+            onDisabled();
+        }
     }
 
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("putIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("putIfAbsent")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
-        withCommsSession(new CommsAction<Object>() {
-            @Override
-            public Object execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("put");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final boolean success = dis.readBoolean();
-                if ( !success ) {
-                    throw new IOException("Expected to receive confirmation of 
'put' request but received unexpected response");
-                }
-
-                return null;
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("put")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        if (!inboundAdapter.getResult()) {
+            throw new IOException("Expected to receive confirmation of 'put' 
request but received unexpected response");

Review comment:
       It would be helpful to adjust this message to clarify what was expected 
in some way.
   ```suggestion
               throw new IOException("Server indicated [put] operation failed");
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);

Review comment:
       What do you think about refactoring the approach to encapsulate the 
ChannelPool inside the Cache Client?

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
##########
@@ -16,40 +16,37 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.IOUtils;
+import io.netty.channel.pool.ChannelPool;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.distributed.cache.client.adapter.BooleanInboundAdapter;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.client.adapter.VoidInboundAdapter;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.ssl.SSLContextService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 @Tags({"distributed", "cache", "state", "set", "cluster"})
 @SeeAlso(classNames = 
{"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", 
"org.apache.nifi.ssl.StandardSSLContextService"})
 @CapabilityDescription("Provides the ability to communicate with a 
DistributedSetCacheServer. This can be used in order to share a Set "
         + "between nodes in a NiFi cluster")
 public class DistributedSetCacheClientService extends 
AbstractControllerService implements DistributedSetCacheClient {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+    private final Logger logger = LoggerFactory.getLogger(getClass());

Review comment:
       Is there a reason for not keeping this marked as `static`?

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyChannelPoolFactory.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.SimpleChannelPool;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Factory for construction of new {@link ChannelPool}, used by distributed 
cache clients to invoke service
+ * methods.  Cache clients include the NiFi services {@link 
DistributedSetCacheClientService}
+ * and {@link DistributedMapCacheClientService}.
+ */
+public class NettyChannelPoolFactory {

Review comment:
       Recommend renaming to something like `CacheClientChannelPoolFactory`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedCacheClient.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.pool.ChannelPool;
+import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+
+import java.io.IOException;
+
+/**
+ * The implementation of the {@link DistributedCacheClient} using the netty 
library to provide the remote
+ * communication services.
+ */
+public class NettyDistributedCacheClient implements DistributedCacheClient {
+
+    /**
+     * The pool of network connections used to service client requests.
+     */
+    private final ChannelPool channelPool;
+
+    /**
+     * Constructor.
+     *
+     * @param channelPool the pool of network connections used to service 
client requests
+     */
+    public NettyDistributedCacheClient(final ChannelPool channelPool) {
+        this.channelPool = channelPool;
+    }
+
+    @Override
+    public void invoke(final OutboundAdapter outboundAdapter, final 
InboundAdapter inboundAdapter) throws IOException {
+        final Channel channel = 
channelPool.acquire().syncUninterruptibly().getNow();
+        try {
+            final NettyRequestHandler requestHandler = (NettyRequestHandler) 
channel.pipeline().last();
+            final byte[] message = outboundAdapter.toBytes();
+            requestHandler.invoke(channel, message, inboundAdapter);

Review comment:
       It seems like this should be refactored so that the outbound `message` 
can be sent directly to the `Channel` as opposed to finding the specific 
handler. That may require other adjustments.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyHandshakeHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class NettyHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public NettyHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(0);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.info("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.info("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException(e);
+            } finally {
+                byteBuf.release();
+            }
+        }
+    }
+
+    /**
+     * Negotiate distributed cache protocol version with remote service.
+     *
+     * @param ctx     the {@link Channel} context
+     * @param byteBuf the byte stream received from the remote service
+     * @throws HandshakeException on failure to negotiate protocol version
+     * @throws IOException        on write failure
+     */
+    private void processHandshake(final ChannelHandlerContext ctx, final 
ByteBuf byteBuf) throws HandshakeException, IOException {
+        final short statusCode = byteBuf.readUnsignedByte();
+        if (statusCode == ProtocolHandshake.RESOURCE_OK) {
+            logger.info("Protocol version {} accepted", 
versionNegotiator.getVersion());

Review comment:
       This should be a debug log.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyHandshakeHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class NettyHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public NettyHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(0);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.info("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.info("Protocol version {} proposed", 
versionNegotiator.getVersion());

Review comment:
       This should be a debug log.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyHandshakeHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class NettyHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public NettyHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(0);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.info("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.info("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException(e);
+            } finally {
+                byteBuf.release();
+            }
+        }
+    }
+
+    /**
+     * Negotiate distributed cache protocol version with remote service.
+     *
+     * @param ctx     the {@link Channel} context
+     * @param byteBuf the byte stream received from the remote service
+     * @throws HandshakeException on failure to negotiate protocol version
+     * @throws IOException        on write failure
+     */
+    private void processHandshake(final ChannelHandlerContext ctx, final 
ByteBuf byteBuf) throws HandshakeException, IOException {
+        final short statusCode = byteBuf.readUnsignedByte();
+        if (statusCode == ProtocolHandshake.RESOURCE_OK) {
+            logger.info("Protocol version {} accepted", 
versionNegotiator.getVersion());
+            protocol.set(versionNegotiator.getVersion());
+        } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) 
{
+            final int newVersion = byteBuf.readInt();
+            logger.info("Protocol version {} counter proposal", newVersion);
+            final Integer newPreference = 
versionNegotiator.getPreferredVersion(newVersion);
+            Optional.ofNullable(newPreference).orElseThrow(() -> new 
HandshakeException("Could not agree on protocol version"));
+            versionNegotiator.setVersion(newPreference);
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new 
OutboundAdapter().write(newPreference).toBytes()));
+        } else if (statusCode == ProtocolHandshake.ABORT) {
+            final short length = byteBuf.readShort();
+            final byte[] message = new byte[length];
+            byteBuf.readBytes(message);
+            throw new HandshakeException("Remote destination aborted 
connection with message: " + new String(message, StandardCharsets.UTF_8));
+        } else {
+            throw new HandshakeException("Unknown handshake signal: " + 
statusCode);
+        }
+    }
+
+    @Override
+    public void channelReadComplete(final ChannelHandlerContext ctx) {
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelReadComplete();
+        } else if (protocol.get() > 0) {

Review comment:
       Recommend using a named static variable instead of `0` to indicate what 
is being evaluated.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyHandshakeHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class NettyHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public NettyHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(0);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.info("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.info("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException(e);

Review comment:
       It would be helpful to include a message along with the exception.
   ```suggestion
                   throw new IllegalStateException("Handshake Processing 
Failed", e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -100,410 +114,180 @@
 
     @OnEnabled
     public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+        logger.info("onEnabled()");
+        this.enabled();
+        this.versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
+        this.channelPool = NettyChannelPoolFactory.createChannelPool(context, 
versionNegotiator);
+        this.cacheClient = new NettyDistributedCacheClient(channelPool);
+    }
+
+    @OnDisabled
+    public void onDisabled()  {
+        logger.info("onDisabled()");
+        try {
+            this.cacheClient.invoke(new OutboundAdapter().write("close"), new 
VoidInboundAdapter());
+        } catch (IOException e) {
+            logger.warn(e.getMessage());
+        }
+        this.channelPool.close();
+        this.versionNegotiator = null;
+        this.channelPool = null;
+        this.cacheClient = null;
     }
 
     @OnStopped
     public void onStopped() throws IOException {
-        close();
+        if (isEnabled()) {
+            onDisabled();
+        }
     }
 
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("putIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("putIfAbsent")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
-        withCommsSession(new CommsAction<Object>() {
-            @Override
-            public Object execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("put");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final boolean success = dis.readBoolean();
-                if ( !success ) {
-                    throw new IOException("Expected to receive confirmation of 
'put' request but received unexpected response");
-                }
-
-                return null;
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("put")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        if (!inboundAdapter.getResult()) {
+            throw new IOException("Expected to receive confirmation of 'put' 
request but received unexpected response");
+        }
     }
 
     @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("containsKey");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("containsKey")
+                .write(key, keySerializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("getAndPutIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("getAndPutIfAbsent")
+                .write(key, keySerializer)
+                .write(value, valueSerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("get");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("get")
+                .write(key, keySerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(session -> {
-            Map<K, V> response = new HashMap<>(keys.size());
-            try {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("subMap");
-                serialize(keys, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-
-                for (K key : keys) {
-                    final byte[] responseBuffer = 
readLengthDelimitedResponse(dis);
-                    response.put(key, 
valueDeserializer.deserialize(responseBuffer));
-                }
-            } catch (UnsupportedOperationException uoe) {
-                // If the server doesn't support subMap, just emulate it with 
multiple calls to get()
-                for (K key : keys) {
-                    response.put(key, get(key, keySerializer, 
valueDeserializer));
-                }
-            }
-
-            return response;
-        });
+        validateProtocolVersion(3);
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("subMap").write(keys.size());
+        for (K key : keys) {
+            outboundAdapter.write(key, keySerializer);
+        }
+        final MapValuesInboundAdapter<K, V> inboundAdapter =
+                new MapValuesInboundAdapter<>(keys, valueDeserializer, new 
HashMap<>());
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("remove");
-
-                serialize(key, serializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("remove")
+                .write(key, serializer);
+        final BooleanInboundAdapter inboundAdapter = new 
BooleanInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("removeAndGet");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        validateProtocolVersion(3);
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("removeAndGet")
+                .write(key, keySerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
     public long removeByPattern(String regex) throws IOException {
-        return withCommsSession(session -> {
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("removeByPattern");
-            dos.writeUTF(regex);
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            return dis.readLong();
-        });
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("removeByPattern").write(regex);
+        final LongInboundAdapter inboundAdapter = new LongInboundAdapter();
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
-    public <K, V> Map<K, V> removeByPatternAndGet(String regex, 
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Map<K, V>>() {
-            @Override
-            public Map<K, V> execute(CommsSession session) throws IOException {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("removeByPatternAndGet");
-                dos.writeUTF(regex);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final int mapSize = dis.readInt();
-                HashMap<K, V> resultMap = new HashMap<>(mapSize);
-                for (int i=0; i<mapSize; i++) {
-                    final byte[] keyBuffer = readLengthDelimitedResponse(dis);
-                    K key = keyDeserializer.deserialize(keyBuffer);
-                    final byte[] valueBuffer = 
readLengthDelimitedResponse(dis);
-                    V value = valueDeserializer.deserialize(valueBuffer);
-                    resultMap.put(key, value);
-                }
-                return resultMap;
-            }
-        });
+    public <K, V> Map<K, V> removeByPatternAndGet(String regex, 
Deserializer<K> keyDeserializer,
+                                                  Deserializer<V> 
valueDeserializer) throws IOException {
+        validateProtocolVersion(3);
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("removeByPatternAndGet").write(regex);
+        final MapInboundAdapter<K, V> inboundAdapter =
+                new MapInboundAdapter<>(keyDeserializer, valueDeserializer, 
new HashMap<>());
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> 
keySerializer, Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(session -> {
-            validateProtocolVersion(session, 2);
-
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("fetch");
-
-            serialize(key, keySerializer, dos);
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            final long revision = dis.readLong();
-            final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-
-            if (revision < 0) {
-                // This indicates that key was not found.
-                return null;
-            }
-
-            return new AtomicCacheEntry(key, 
valueDeserializer.deserialize(responseBuffer), revision);
-        });
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer,
+                                                     final Deserializer<V> 
valueDeserializer) throws IOException {
+        validateProtocolVersion(2);
+        final OutboundAdapter outboundAdapter = new 
OutboundAdapter().write("fetch")
+                .write(key, keySerializer);
+        final AtomicCacheEntryInboundAdapter<K, V> inboundAdapter =
+                new AtomicCacheEntryInboundAdapter<>(key, valueDeserializer);
+        cacheClient.invoke(outboundAdapter, inboundAdapter);
+        return inboundAdapter.getResult();
     }
 
-    private void validateProtocolVersion(final CommsSession session, final int 
requiredProtocolVersion) {
-        if (session.getProtocolVersion() < requiredProtocolVersion) {
+    private void validateProtocolVersion(final int requiredProtocolVersion) {
+        if (versionNegotiator.getVersion() < requiredProtocolVersion) {
             throw new UnsupportedOperationException("Remote cache server 
doesn't support protocol version " + requiredProtocolVersion);
         }
     }
 
     @Override
     public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, 
Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-        return withCommsSession(session -> {
-            validateProtocolVersion(session, 2);
-
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("replace");
-
-            serialize(entry.getKey(), keySerializer, dos);
-            dos.writeLong(entry.getRevision().orElse(0L));
-            serialize(entry.getValue(), valueSerializer, dos);
-
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            return dis.readBoolean();
-        });
+        validateProtocolVersion(2);
+        final OutboundAdapter outboundAdapter = new OutboundAdapter();
+        outboundAdapter.write("replace")
+                .write(entry.getKey(), keySerializer)
+                .write(entry.getRevision().orElse(0L))

Review comment:
       Recommend adding a static variable to define default revision as `0`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
##########
@@ -177,7 +177,11 @@ public void stop() throws IOException {
         logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
 
         if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
-            serverSocketChannel.close();
+            try {
+                serverSocketChannel.close();
+            } catch (IOException e) {
+                logger.warn(e.getMessage());

Review comment:
       Recommend adding a message to this log:
   ```suggestion
                   logger.warn("Server Socket Close Failed", e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/LongInboundAdapter.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.adapter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link InboundAdapter} where the service response payload 
is expected to be a {@link Long}.
+ */
+public class LongInboundAdapter implements InboundAdapter {
+
+    /**
+     * Container for bytes queued from the service response {@link 
io.netty.channel.Channel}.
+     */
+    private final ByteBuf byteBuf;
+
+    /**
+     * The received service method response value.  This is set to a non-null 
value upon receipt.
+     */
+    private Long result;
+
+    /**
+     * Constructor.
+     */
+    public LongInboundAdapter() {
+        this.byteBuf = Unpooled.buffer();
+        this.result = null;
+    }
+
+    /**
+     * @return the service method response value
+     */
+    public Long getResult() {
+        return result;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return (result != null);
+    }
+
+    @Override
+    public void queue(final byte[] bytes) {
+        byteBuf.writeBytes(bytes);
+    }
+
+    @Override
+    public void dequeue() throws IOException {
+        if (byteBuf.readableBytes() >= 8) {

Review comment:
       Recommend a named variable for `8`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
##########
@@ -16,35 +16,23 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
-public interface CommsSession extends Closeable {
-
-    void setTimeout(final long value, final TimeUnit timeUnit);
-
-    InputStream getInputStream() throws IOException;
-
-    OutputStream getOutputStream() throws IOException;
-
-    boolean isClosed();
+import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
 
-    void interrupt();
-
-    String getHostname();
-
-    int getPort();
-
-    long getTimeout(TimeUnit timeUnit);
-
-    SSLContext getSSLContext();
-
-    int getProtocolVersion();
+import java.io.IOException;
 
-    void setProtocolVersion(final int protocolVersion);
+/**
+ * Encapsulate operations which may be performed using a {@link 
DistributedSetCacheClientService} or a
+ * {@link DistributedMapCacheClientService}.
+ */
+public interface DistributedCacheClient {
+
+    /**
+     * Call a service method.
+     *
+     * @param outboundAdapter the object used to assemble the service request 
byte stream
+     * @param inboundAdapter  the object used to interpret the service 
response byte stream
+     * @throws IOException on serialization failure; on communication failure
+     */
+    void invoke(OutboundAdapter outboundAdapter, InboundAdapter 
inboundAdapter) throws IOException;

Review comment:
       The is certainly a flexible interface, but it seems a bit too generic. 
What do you think about refactoring the interface to define the specific 
methods supported by the cache protocol? That would avoid requiring the caller 
to use string-based names for operations, but would provide a lower-level 
wrapper for various methods.  For example, a `put` method with a void return 
and byte array key and value. Essentially following the higher-level cache 
client interfaces, but closer to the protocol level.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Verify basic functionality of {@link DistributedMapCacheClientService}, in 
the context of a TLS authenticated
+ * socket session.
+ * <p>
+ * This test instantiates both the server and client {@link 
org.apache.nifi.controller.ControllerService} objects
+ * implementing the distributed cache protocol.  It assumes that the default 
distributed cache port (4557)
+ * is available.
+ */
+public class DistributedMapCacheTlsTest {

Review comment:
       Could this test logic be migrated to the same test class, just with a 
different method to exercise TLS support?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to