exceptionfactory commented on a change in pull request #5311:
URL: https://github.com/apache/nifi/pull/5311#discussion_r707607174



##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.debug("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.debug("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException("Handshake Processing Failed", 
e);
+            } finally {
+                byteBuf.release();
+            }
+        }
+    }
+
+    /**
+     * Negotiate distributed cache protocol version with remote service.
+     *
+     * @param ctx     the {@link Channel} context
+     * @param byteBuf the byte stream received from the remote service
+     * @throws HandshakeException on failure to negotiate protocol version
+     * @throws IOException        on write failure
+     */
+    private void processHandshake(final ChannelHandlerContext ctx, final 
ByteBuf byteBuf) throws HandshakeException, IOException {
+        final short statusCode = byteBuf.readUnsignedByte();
+        if (statusCode == ProtocolHandshake.RESOURCE_OK) {
+            logger.debug("Protocol version {} accepted", 
versionNegotiator.getVersion());
+            protocol.set(versionNegotiator.getVersion());
+        } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) 
{
+            final int newVersion = byteBuf.readInt();
+            logger.debug("Protocol version {} counter proposal", newVersion);

Review comment:
       It would be helpful to clarify that this version received in the message:
   ```suggestion
               logger.debug("Received Protocol version {} counter proposal", 
newVersion);
   ```
   

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
+
+import java.io.IOException;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for sending client 
requests and receiving server responses
+ * in the context of a distributed cache server.
+ */
+public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
+
+    /**
+     * The object used to buffer and interpret the service response byte 
stream.
+     */
+    private InboundAdapter inboundAdapter;
+
+    /**
+     * The synchronization construct used to signal the client application 
that the server response has been received.
+     */
+    private ChannelPromise channelPromise;
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
+        final ByteBuf byteBuf = (ByteBuf) msg;
+        try {
+            final byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            inboundAdapter.queue(bytes);
+        } finally {
+            byteBuf.release();
+        }
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
IOException {
+        inboundAdapter.dequeue();
+        if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) {
+            channelPromise.setSuccess();
+        }
+    }
+
+    /**
+     * Perform a synchronous method call to the server.  The server is 
expected to write
+     * a byte stream response to the channel, which may be deserialized into a 
Java object
+     * by the caller.
+     *
+     * @param channel        the network channel used to make the request
+     * @param message        the request payload, which might be a method 
name, and [0..n] concatenated arguments
+     * @param inboundAdapter the business logic to deserialize the server 
response
+     */
+    public void invoke(final Channel channel, final byte[] message, final 
InboundAdapter inboundAdapter) {
+        final CacheClientHandshakeHandler handshakeHandler = 
channel.pipeline().get(CacheClientHandshakeHandler.class);
+        handshakeHandler.waitHandshakeComplete();
+        this.inboundAdapter = inboundAdapter;

Review comment:
       Are there potential thread-safety concerns with reassigning the value of 
`inboundAdapter` in this method?  Instead of reassigning `inboundAdapter`, 
would it make sense to have an internal queue of bytes, then use the passed 
`InboundAdapter` to process the bytes? The current implementation is not 
completely clear on the use of the InboundAdapter.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.SimpleChannelPool;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Factory for construction of new {@link ChannelPool}, used by distributed 
cache clients to invoke service
+ * methods.  Cache clients include the NiFi services {@link 
DistributedSetCacheClientService}
+ * and {@link DistributedMapCacheClientService}.
+ */
+public class CacheClientChannelPoolFactory {
+
+    /**
+     * Instantiate a new netty pool of channels to be used for distributed 
cache communications
+     *
+     * @param context           the NiFi configuration to be applied to the 
channel pool
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     * @return a channel pool object from which {@link Channel} objects may be 
obtained
+     */
+    public static ChannelPool createChannelPool(final ConfigurationContext 
context, final VersionNegotiator versionNegotiator) {
+        final String hostname = 
context.getProperty(DistributedSetCacheClientService.HOSTNAME).getValue();
+        final int port = 
context.getProperty(DistributedSetCacheClientService.PORT).asInteger();
+        final PropertyValue timeoutMillis = 
context.getProperty(DistributedSetCacheClientService.COMMUNICATIONS_TIMEOUT);
+        final SSLContextService sslContextService = context.getProperty(
+                
DistributedSetCacheClientService.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = (sslContextService == null) ? null : 
sslContextService.createContext();
+
+        final EventLoopGroup group = new NioEventLoopGroup();
+        final Bootstrap bootstrap = new Bootstrap();
+        final CacheClientChannelInitializer initializer = new 
CacheClientChannelInitializer(sslContext, versionNegotiator);
+        bootstrap.group(group)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
timeoutMillis.asTimePeriod(TimeUnit.MILLISECONDS).intValue())
+                .remoteAddress(hostname, port)
+                .channel(NioSocketChannel.class);
+        final ChannelPoolHandler channelPoolHandler = new 
InitializingChannelPoolHandler(initializer);
+        return new SimpleChannelPool(bootstrap, channelPoolHandler);

Review comment:
       It would be better to use the `FixedChannelPool`, which supports 
configuring a maximum number of connections. See `NettyEventSenderFactory` for 
example usage with additional configuration properties.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -99,411 +101,140 @@
     }
 
     @OnEnabled
-    public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+    public void onEnabled(final ConfigurationContext context) {
+        super.enabled();
+        getLogger().debug("Enabling Map Cache Client Service [{}]", 
context.getName());
+        this.versionNegotiator = new StandardVersionNegotiator(
+                ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), 
ProtocolVersion.V1.value());
+        this.cacheClient = new NettyDistributedMapCacheClient(context, 
versionNegotiator);
+    }
+
+    @OnDisabled
+    public void onDisabled() throws IOException {
+        getLogger().debug("Disabling Map Cache Client Service");
+        this.cacheClient.close();
+        this.versionNegotiator = null;
+        this.cacheClient = null;
+        super.disabled();

Review comment:
       As mentioned above, this invocation should not be necessary.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.debug("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.debug("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException("Handshake Processing Failed", 
e);
+            } finally {
+                byteBuf.release();
+            }
+        }
+    }
+
+    /**
+     * Negotiate distributed cache protocol version with remote service.
+     *
+     * @param ctx     the {@link Channel} context
+     * @param byteBuf the byte stream received from the remote service
+     * @throws HandshakeException on failure to negotiate protocol version
+     * @throws IOException        on write failure
+     */
+    private void processHandshake(final ChannelHandlerContext ctx, final 
ByteBuf byteBuf) throws HandshakeException, IOException {
+        final short statusCode = byteBuf.readUnsignedByte();
+        if (statusCode == ProtocolHandshake.RESOURCE_OK) {
+            logger.debug("Protocol version {} accepted", 
versionNegotiator.getVersion());
+            protocol.set(versionNegotiator.getVersion());
+        } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) 
{
+            final int newVersion = byteBuf.readInt();
+            logger.debug("Protocol version {} counter proposal", newVersion);
+            final Integer newPreference = 
versionNegotiator.getPreferredVersion(newVersion);
+            Optional.ofNullable(newPreference).orElseThrow(() -> new 
HandshakeException("Could not agree on protocol version"));
+            versionNegotiator.setVersion(newPreference);
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new 
OutboundAdapter().write(newPreference).toBytes()));
+        } else if (statusCode == ProtocolHandshake.ABORT) {
+            final short length = byteBuf.readShort();
+            final byte[] message = new byte[length];
+            byteBuf.readBytes(message);
+            throw new HandshakeException("Remote destination aborted 
connection with message: " + new String(message, StandardCharsets.UTF_8));
+        } else {
+            throw new HandshakeException("Unknown handshake signal: " + 
statusCode);
+        }
+    }
+
+    @Override
+    public void channelReadComplete(final ChannelHandlerContext ctx) {
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelReadComplete();
+        } else if (protocol.get() > PROTOCOL_UNINITIALIZED) {
+            promiseHandshakeComplete.setSuccess();
+        }
+    }
+
+    private static final int PROTOCOL_UNINITIALIZED = 0;

Review comment:
       Recommend moving this variable declaration to the beginning on the class 
with the other private static variables.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -99,411 +101,140 @@
     }
 
     @OnEnabled
-    public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+    public void onEnabled(final ConfigurationContext context) {
+        super.enabled();

Review comment:
       It should not be necessary to invoke this method, the frameworks should 
handle invoking all annotated methods.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
##########
@@ -95,121 +91,51 @@
     }
 
     @OnEnabled
-    public void onConfigured(final ConfigurationContext context) {
-        this.configContext = context;
+    public void onEnabled(final ConfigurationContext context) {
+        super.enabled();

Review comment:
       Should be removed.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
##########
@@ -95,121 +91,51 @@
     }
 
     @OnEnabled
-    public void onConfigured(final ConfigurationContext context) {
-        this.configContext = context;
+    public void onEnabled(final ConfigurationContext context) {
+        super.enabled();
+        getLogger().debug("Enabling Set Cache Client Service [{}]", 
context.getName());
+        this.versionNegotiator = new 
StandardVersionNegotiator(ProtocolVersion.V1.value());
+        this.cacheClient = new NettyDistributedSetCacheClient(context, 
versionNegotiator);
     }
 
-    @OnStopped
-    public void onStopped() throws IOException {
-        close();
+    @OnDisabled
+    public void onDisabled() throws IOException {
+        getLogger().debug("Disabling Set Cache Client Service");
+        this.cacheClient.close();
+        this.versionNegotiator = null;
+        this.cacheClient = null;
+        super.disabled();

Review comment:
       Should be removed.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for performing the 
client handshake with the
+ * distributed cache server.
+ */
+public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * The header bytes used to initiate the server handshake.
+     */
+    private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    /**
+     * The synchronization construct used to signal the client application 
that the handshake has finished.
+     */
+    private final ChannelPromise promiseHandshakeComplete;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final AtomicInteger protocol;
+
+    /**
+     * The coordinator used to broker the version of the distributed cache 
protocol with the service.
+     */
+    private final VersionNegotiator versionNegotiator;
+
+    /**
+     * Constructor.
+     *
+     * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
+     * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     */
+    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+        this.promiseHandshakeComplete = channel.newPromise();
+        this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    /**
+     * API providing client application with visibility into the handshake 
process.  Distributed cache requests
+     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     */
+    public void waitHandshakeComplete() {
+        promiseHandshakeComplete.awaitUninterruptibly();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws 
IOException {
+        final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER);
+        ctx.write(byteBufMagic);
+        logger.debug("Magic header written");
+        final int currentVersion = versionNegotiator.getVersion();
+        final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new 
OutboundAdapter().write(currentVersion).toBytes());
+        ctx.writeAndFlush(byteBufVersion);
+        logger.debug("Protocol version {} proposed", 
versionNegotiator.getVersion());
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
+        if (promiseHandshakeComplete.isSuccess()) {
+            ctx.fireChannelRead(msg);
+        } else {
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            try {
+                processHandshake(ctx, byteBuf);
+            } catch (IOException | HandshakeException e) {
+                throw new IllegalStateException("Handshake Processing Failed", 
e);
+            } finally {
+                byteBuf.release();
+            }
+        }
+    }
+
+    /**
+     * Negotiate distributed cache protocol version with remote service.
+     *
+     * @param ctx     the {@link Channel} context
+     * @param byteBuf the byte stream received from the remote service
+     * @throws HandshakeException on failure to negotiate protocol version
+     * @throws IOException        on write failure
+     */
+    private void processHandshake(final ChannelHandlerContext ctx, final 
ByteBuf byteBuf) throws HandshakeException, IOException {
+        final short statusCode = byteBuf.readUnsignedByte();
+        if (statusCode == ProtocolHandshake.RESOURCE_OK) {
+            logger.debug("Protocol version {} accepted", 
versionNegotiator.getVersion());
+            protocol.set(versionNegotiator.getVersion());
+        } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) 
{
+            final int newVersion = byteBuf.readInt();
+            logger.debug("Protocol version {} counter proposal", newVersion);
+            final Integer newPreference = 
versionNegotiator.getPreferredVersion(newVersion);
+            Optional.ofNullable(newPreference).orElseThrow(() -> new 
HandshakeException("Could not agree on protocol version"));

Review comment:
       It would be helpful to include the received version in the message:
   ```suggestion
               Optional.ofNullable(newPreference).orElseThrow(() -> new 
HandshakeException(String.format("Received unsupported protocol version 
proposal [%d]", newVersion)));
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Bootstrap a new netty connection.  This performs the socket handshake used 
by the nifi distributed set /
+ * distributed map client controller services.
+ */
+public class CacheClientChannelInitializer extends ChannelInitializer<Channel> 
{
+
+    /**
+     * The (optional) secure context associated with the channel.  If null, 
channel is not SSL protected.
+     */
+    private final SSLContext sslContext;
+
+    /**
+     * The version of the protocol negotiated between the client and server.
+     */
+    private final VersionNegotiator versionNegotiator;

Review comment:
       As a stateful object, is it safe to reuse this instance across channels? 
It seems like a new instance should be created in `initChannel`.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
##########
@@ -99,411 +101,140 @@
     }
 
     @OnEnabled
-    public void cacheConfig(final ConfigurationContext context) {
-        this.configContext = context;
+    public void onEnabled(final ConfigurationContext context) {
+        super.enabled();
+        getLogger().debug("Enabling Map Cache Client Service [{}]", 
context.getName());
+        this.versionNegotiator = new StandardVersionNegotiator(
+                ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), 
ProtocolVersion.V1.value());
+        this.cacheClient = new NettyDistributedMapCacheClient(context, 
versionNegotiator);
+    }
+
+    @OnDisabled
+    public void onDisabled() throws IOException {
+        getLogger().debug("Disabling Map Cache Client Service");
+        this.cacheClient.close();
+        this.versionNegotiator = null;
+        this.cacheClient = null;
+        super.disabled();
     }
 
     @OnStopped
     public void onStopped() throws IOException {
-        close();
+        if (isEnabled()) {
+            onDisabled();
+        }
     }
 
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("putIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final byte[] bytesValue = CacheClientSerde.serialize(value, 
valueSerializer);
+        return cacheClient.putIfAbsent(bytesKey, bytesValue);
     }
 
     @Override
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
-        withCommsSession(new CommsAction<Object>() {
-            @Override
-            public Object execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("put");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-
-                dos.flush();
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final boolean success = dis.readBoolean();
-                if ( !success ) {
-                    throw new IOException("Expected to receive confirmation of 
'put' request but received unexpected response");
-                }
-
-                return null;
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final byte[] bytesValue = CacheClientSerde.serialize(value, 
valueSerializer);
+        cacheClient.putIfAbsent(bytesKey, bytesValue);
     }
 
     @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("containsKey");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        return cacheClient.containsKey(bytesKey);
     }
 
     @Override
     public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("getAndPutIfAbsent");
-
-                serialize(key, keySerializer, dos);
-                serialize(value, valueSerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final byte[] bytesValue = CacheClientSerde.serialize(value, 
valueSerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        return cacheClient.getAndPutIfAbsent(bytesKey, bytesValue, 
inboundAdapter);
     }
 
     @Override
     public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("get");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        return cacheClient.get(bytesKey, inboundAdapter);
     }
 
     @Override
     public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(session -> {
-            Map<K, V> response = new HashMap<>(keys.size());
-            try {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("subMap");
-                serialize(keys, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-
-                for (K key : keys) {
-                    final byte[] responseBuffer = 
readLengthDelimitedResponse(dis);
-                    response.put(key, 
valueDeserializer.deserialize(responseBuffer));
-                }
-            } catch (UnsupportedOperationException uoe) {
-                // If the server doesn't support subMap, just emulate it with 
multiple calls to get()
-                for (K key : keys) {
-                    response.put(key, get(key, keySerializer, 
valueDeserializer));
-                }
-            }
-
-            return response;
-        });
+        validateProtocolVersion(ProtocolVersion.V3.value());
+        Collection<byte[]> bytesKeys = CacheClientSerde.serialize(keys, 
keySerializer);
+        final MapValuesInboundAdapter<K, V> inboundAdapter =
+                new MapValuesInboundAdapter<>(keys, valueDeserializer, new 
HashMap<>());
+        return cacheClient.subMap(bytesKeys, inboundAdapter);
     }
 
     @Override
     public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
-        return withCommsSession(new CommsAction<Boolean>() {
-            @Override
-            public Boolean execute(final CommsSession session) throws 
IOException {
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("remove");
-
-                serialize(key, serializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                return dis.readBoolean();
-            }
-        });
+        final byte[] bytesKey = CacheClientSerde.serialize(key, serializer);
+        return cacheClient.remove(bytesKey);
     }
 
     @Override
     public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(new CommsAction<V>() {
-            @Override
-            public V execute(final CommsSession session) throws IOException {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("removeAndGet");
-
-                serialize(key, keySerializer, dos);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-                return valueDeserializer.deserialize(responseBuffer);
-            }
-        });
+        validateProtocolVersion(ProtocolVersion.V3.value());
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final ValueInboundAdapter<V> inboundAdapter = new 
ValueInboundAdapter<>(valueDeserializer);
+        return cacheClient.removeAndGet(bytesKey, inboundAdapter);
     }
 
     @Override
     public long removeByPattern(String regex) throws IOException {
-        return withCommsSession(session -> {
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("removeByPattern");
-            dos.writeUTF(regex);
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            return dis.readLong();
-        });
+        return cacheClient.removeByPattern(regex);
     }
 
     @Override
-    public <K, V> Map<K, V> removeByPatternAndGet(String regex, 
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws 
IOException {
-        return withCommsSession(new CommsAction<Map<K, V>>() {
-            @Override
-            public Map<K, V> execute(CommsSession session) throws IOException {
-                validateProtocolVersion(session, 3);
-
-                final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-                dos.writeUTF("removeByPatternAndGet");
-                dos.writeUTF(regex);
-                dos.flush();
-
-                // read response
-                final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-                final int mapSize = dis.readInt();
-                HashMap<K, V> resultMap = new HashMap<>(mapSize);
-                for (int i=0; i<mapSize; i++) {
-                    final byte[] keyBuffer = readLengthDelimitedResponse(dis);
-                    K key = keyDeserializer.deserialize(keyBuffer);
-                    final byte[] valueBuffer = 
readLengthDelimitedResponse(dis);
-                    V value = valueDeserializer.deserialize(valueBuffer);
-                    resultMap.put(key, value);
-                }
-                return resultMap;
-            }
-        });
+    public <K, V> Map<K, V> removeByPatternAndGet(String regex, 
Deserializer<K> keyDeserializer,
+                                                  Deserializer<V> 
valueDeserializer) throws IOException {
+        validateProtocolVersion(ProtocolVersion.V3.value());
+        final MapInboundAdapter<K, V> inboundAdapter =
+                new MapInboundAdapter<>(keyDeserializer, valueDeserializer, 
new HashMap<>());
+        return cacheClient.removeByPatternAndGet(regex, inboundAdapter);
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> 
keySerializer, Deserializer<V> valueDeserializer) throws IOException {
-        return withCommsSession(session -> {
-            validateProtocolVersion(session, 2);
-
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("fetch");
-
-            serialize(key, keySerializer, dos);
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            final long revision = dis.readLong();
-            final byte[] responseBuffer = readLengthDelimitedResponse(dis);
-
-            if (revision < 0) {
-                // This indicates that key was not found.
-                return null;
-            }
-
-            return new AtomicCacheEntry(key, 
valueDeserializer.deserialize(responseBuffer), revision);
-        });
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer,
+                                                     final Deserializer<V> 
valueDeserializer) throws IOException {
+        validateProtocolVersion(ProtocolVersion.V2.value());
+        final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
+        final AtomicCacheEntryInboundAdapter<K, V> inboundAdapter =
+                new AtomicCacheEntryInboundAdapter<>(key, valueDeserializer);
+        return cacheClient.fetch(bytesKey, inboundAdapter);
     }
 
-    private void validateProtocolVersion(final CommsSession session, final int 
requiredProtocolVersion) {
-        if (session.getProtocolVersion() < requiredProtocolVersion) {
+    private void validateProtocolVersion(final int requiredProtocolVersion) {
+        if (versionNegotiator.getVersion() < requiredProtocolVersion) {
             throw new UnsupportedOperationException("Remote cache server 
doesn't support protocol version " + requiredProtocolVersion);
         }
     }
 
     @Override
     public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, 
Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-        return withCommsSession(session -> {
-            validateProtocolVersion(session, 2);
-
-            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
-            dos.writeUTF("replace");
-
-            serialize(entry.getKey(), keySerializer, dos);
-            dos.writeLong(entry.getRevision().orElse(0L));
-            serialize(entry.getValue(), valueSerializer, dos);
-
-            dos.flush();
-
-            // read response
-            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
-            return dis.readBoolean();
-        });
+        validateProtocolVersion(ProtocolVersion.V2.value());
+        final byte[] bytesKey = CacheClientSerde.serialize(entry.getKey(), 
keySerializer);
+        final byte[] bytesValue = CacheClientSerde.serialize(entry.getValue(), 
valueSerializer);
+        final long revision = 
entry.getRevision().orElse(DEFAULT_CACHE_REVISION);
+        return cacheClient.replace(bytesKey, bytesValue, revision);
     }
 
+    private static final long DEFAULT_CACHE_REVISION = 0L;

Review comment:
       This declaration should be moved to the beginning of the class, before 
constructors and methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to