exceptionfactory commented on a change in pull request #5311: URL: https://github.com/apache/nifi/pull/5311#discussion_r707607174
########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.remote.VersionNegotiator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link io.netty.channel.ChannelHandler} responsible for performing the client handshake with the + * distributed cache server. + */ +public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * The header bytes used to initiate the server handshake. + */ + private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'}; + + /** + * The synchronization construct used to signal the client application that the handshake has finished. + */ + private final ChannelPromise promiseHandshakeComplete; + + /** + * The version of the protocol negotiated between the client and server. + */ + private final AtomicInteger protocol; + + /** + * The coordinator used to broker the version of the distributed cache protocol with the service. + */ + private final VersionNegotiator versionNegotiator; + + /** + * Constructor. + * + * @param channel the channel to which this {@link io.netty.channel.ChannelHandler} is bound. + * @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service + */ + public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator) { + this.promiseHandshakeComplete = channel.newPromise(); + this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED); + this.versionNegotiator = versionNegotiator; + } + + /** + * API providing client application with visibility into the handshake process. Distributed cache requests + * should not be sent using this {@link Channel} until the handshake is complete. + */ + public void waitHandshakeComplete() { + promiseHandshakeComplete.awaitUninterruptibly(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws IOException { + final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER); + ctx.write(byteBufMagic); + logger.debug("Magic header written"); + final int currentVersion = versionNegotiator.getVersion(); + final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new OutboundAdapter().write(currentVersion).toBytes()); + ctx.writeAndFlush(byteBufVersion); + logger.debug("Protocol version {} proposed", versionNegotiator.getVersion()); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (promiseHandshakeComplete.isSuccess()) { + ctx.fireChannelRead(msg); + } else { + final ByteBuf byteBuf = (ByteBuf) msg; + try { + processHandshake(ctx, byteBuf); + } catch (IOException | HandshakeException e) { + throw new IllegalStateException("Handshake Processing Failed", e); + } finally { + byteBuf.release(); + } + } + } + + /** + * Negotiate distributed cache protocol version with remote service. + * + * @param ctx the {@link Channel} context + * @param byteBuf the byte stream received from the remote service + * @throws HandshakeException on failure to negotiate protocol version + * @throws IOException on write failure + */ + private void processHandshake(final ChannelHandlerContext ctx, final ByteBuf byteBuf) throws HandshakeException, IOException { + final short statusCode = byteBuf.readUnsignedByte(); + if (statusCode == ProtocolHandshake.RESOURCE_OK) { + logger.debug("Protocol version {} accepted", versionNegotiator.getVersion()); + protocol.set(versionNegotiator.getVersion()); + } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) { + final int newVersion = byteBuf.readInt(); + logger.debug("Protocol version {} counter proposal", newVersion); Review comment: It would be helpful to clarify that this version received in the message: ```suggestion logger.debug("Received Protocol version {} counter proposal", newVersion); ``` ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter; + +import java.io.IOException; + +/** + * The {@link io.netty.channel.ChannelHandler} responsible for sending client requests and receiving server responses + * in the context of a distributed cache server. + */ +public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter { + + /** + * The object used to buffer and interpret the service response byte stream. + */ + private InboundAdapter inboundAdapter; + + /** + * The synchronization construct used to signal the client application that the server response has been received. + */ + private ChannelPromise channelPromise; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + final ByteBuf byteBuf = (ByteBuf) msg; + try { + final byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + inboundAdapter.queue(bytes); + } finally { + byteBuf.release(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { + inboundAdapter.dequeue(); + if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) { + channelPromise.setSuccess(); + } + } + + /** + * Perform a synchronous method call to the server. The server is expected to write + * a byte stream response to the channel, which may be deserialized into a Java object + * by the caller. + * + * @param channel the network channel used to make the request + * @param message the request payload, which might be a method name, and [0..n] concatenated arguments + * @param inboundAdapter the business logic to deserialize the server response + */ + public void invoke(final Channel channel, final byte[] message, final InboundAdapter inboundAdapter) { + final CacheClientHandshakeHandler handshakeHandler = channel.pipeline().get(CacheClientHandshakeHandler.class); + handshakeHandler.waitHandshakeComplete(); + this.inboundAdapter = inboundAdapter; Review comment: Are there potential thread-safety concerns with reassigning the value of `inboundAdapter` in this method? Instead of reassigning `inboundAdapter`, would it make sense to have an internal queue of bytes, then use the passed `InboundAdapter` to process the bytes? The current implementation is not completely clear on the use of the InboundAdapter. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.SimpleChannelPool; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.util.concurrent.TimeUnit; + +/** + * Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service + * methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService} + * and {@link DistributedMapCacheClientService}. + */ +public class CacheClientChannelPoolFactory { + + /** + * Instantiate a new netty pool of channels to be used for distributed cache communications + * + * @param context the NiFi configuration to be applied to the channel pool + * @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service + * @return a channel pool object from which {@link Channel} objects may be obtained + */ + public static ChannelPool createChannelPool(final ConfigurationContext context, final VersionNegotiator versionNegotiator) { + final String hostname = context.getProperty(DistributedSetCacheClientService.HOSTNAME).getValue(); + final int port = context.getProperty(DistributedSetCacheClientService.PORT).asInteger(); + final PropertyValue timeoutMillis = context.getProperty(DistributedSetCacheClientService.COMMUNICATIONS_TIMEOUT); + final SSLContextService sslContextService = context.getProperty( + DistributedSetCacheClientService.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext(); + + final EventLoopGroup group = new NioEventLoopGroup(); + final Bootstrap bootstrap = new Bootstrap(); + final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, versionNegotiator); + bootstrap.group(group) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis.asTimePeriod(TimeUnit.MILLISECONDS).intValue()) + .remoteAddress(hostname, port) + .channel(NioSocketChannel.class); + final ChannelPoolHandler channelPoolHandler = new InitializingChannelPoolHandler(initializer); + return new SimpleChannelPool(bootstrap, channelPoolHandler); Review comment: It would be better to use the `FixedChannelPool`, which supports configuring a maximum number of connections. See `NettyEventSenderFactory` for example usage with additional configuration properties. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ########## @@ -99,411 +101,140 @@ } @OnEnabled - public void cacheConfig(final ConfigurationContext context) { - this.configContext = context; + public void onEnabled(final ConfigurationContext context) { + super.enabled(); + getLogger().debug("Enabling Map Cache Client Service [{}]", context.getName()); + this.versionNegotiator = new StandardVersionNegotiator( + ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), ProtocolVersion.V1.value()); + this.cacheClient = new NettyDistributedMapCacheClient(context, versionNegotiator); + } + + @OnDisabled + public void onDisabled() throws IOException { + getLogger().debug("Disabling Map Cache Client Service"); + this.cacheClient.close(); + this.versionNegotiator = null; + this.cacheClient = null; + super.disabled(); Review comment: As mentioned above, this invocation should not be necessary. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.remote.VersionNegotiator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link io.netty.channel.ChannelHandler} responsible for performing the client handshake with the + * distributed cache server. + */ +public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * The header bytes used to initiate the server handshake. + */ + private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'}; + + /** + * The synchronization construct used to signal the client application that the handshake has finished. + */ + private final ChannelPromise promiseHandshakeComplete; + + /** + * The version of the protocol negotiated between the client and server. + */ + private final AtomicInteger protocol; + + /** + * The coordinator used to broker the version of the distributed cache protocol with the service. + */ + private final VersionNegotiator versionNegotiator; + + /** + * Constructor. + * + * @param channel the channel to which this {@link io.netty.channel.ChannelHandler} is bound. + * @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service + */ + public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator) { + this.promiseHandshakeComplete = channel.newPromise(); + this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED); + this.versionNegotiator = versionNegotiator; + } + + /** + * API providing client application with visibility into the handshake process. Distributed cache requests + * should not be sent using this {@link Channel} until the handshake is complete. + */ + public void waitHandshakeComplete() { + promiseHandshakeComplete.awaitUninterruptibly(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws IOException { + final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER); + ctx.write(byteBufMagic); + logger.debug("Magic header written"); + final int currentVersion = versionNegotiator.getVersion(); + final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new OutboundAdapter().write(currentVersion).toBytes()); + ctx.writeAndFlush(byteBufVersion); + logger.debug("Protocol version {} proposed", versionNegotiator.getVersion()); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (promiseHandshakeComplete.isSuccess()) { + ctx.fireChannelRead(msg); + } else { + final ByteBuf byteBuf = (ByteBuf) msg; + try { + processHandshake(ctx, byteBuf); + } catch (IOException | HandshakeException e) { + throw new IllegalStateException("Handshake Processing Failed", e); + } finally { + byteBuf.release(); + } + } + } + + /** + * Negotiate distributed cache protocol version with remote service. + * + * @param ctx the {@link Channel} context + * @param byteBuf the byte stream received from the remote service + * @throws HandshakeException on failure to negotiate protocol version + * @throws IOException on write failure + */ + private void processHandshake(final ChannelHandlerContext ctx, final ByteBuf byteBuf) throws HandshakeException, IOException { + final short statusCode = byteBuf.readUnsignedByte(); + if (statusCode == ProtocolHandshake.RESOURCE_OK) { + logger.debug("Protocol version {} accepted", versionNegotiator.getVersion()); + protocol.set(versionNegotiator.getVersion()); + } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) { + final int newVersion = byteBuf.readInt(); + logger.debug("Protocol version {} counter proposal", newVersion); + final Integer newPreference = versionNegotiator.getPreferredVersion(newVersion); + Optional.ofNullable(newPreference).orElseThrow(() -> new HandshakeException("Could not agree on protocol version")); + versionNegotiator.setVersion(newPreference); + ctx.writeAndFlush(Unpooled.wrappedBuffer(new OutboundAdapter().write(newPreference).toBytes())); + } else if (statusCode == ProtocolHandshake.ABORT) { + final short length = byteBuf.readShort(); + final byte[] message = new byte[length]; + byteBuf.readBytes(message); + throw new HandshakeException("Remote destination aborted connection with message: " + new String(message, StandardCharsets.UTF_8)); + } else { + throw new HandshakeException("Unknown handshake signal: " + statusCode); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + if (promiseHandshakeComplete.isSuccess()) { + ctx.fireChannelReadComplete(); + } else if (protocol.get() > PROTOCOL_UNINITIALIZED) { + promiseHandshakeComplete.setSuccess(); + } + } + + private static final int PROTOCOL_UNINITIALIZED = 0; Review comment: Recommend moving this variable declaration to the beginning on the class with the other private static variables. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ########## @@ -99,411 +101,140 @@ } @OnEnabled - public void cacheConfig(final ConfigurationContext context) { - this.configContext = context; + public void onEnabled(final ConfigurationContext context) { + super.enabled(); Review comment: It should not be necessary to invoke this method, the frameworks should handle invoking all annotated methods. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ########## @@ -95,121 +91,51 @@ } @OnEnabled - public void onConfigured(final ConfigurationContext context) { - this.configContext = context; + public void onEnabled(final ConfigurationContext context) { + super.enabled(); Review comment: Should be removed. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ########## @@ -95,121 +91,51 @@ } @OnEnabled - public void onConfigured(final ConfigurationContext context) { - this.configContext = context; + public void onEnabled(final ConfigurationContext context) { + super.enabled(); + getLogger().debug("Enabling Set Cache Client Service [{}]", context.getName()); + this.versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V1.value()); + this.cacheClient = new NettyDistributedSetCacheClient(context, versionNegotiator); } - @OnStopped - public void onStopped() throws IOException { - close(); + @OnDisabled + public void onDisabled() throws IOException { + getLogger().debug("Disabling Set Cache Client Service"); + this.cacheClient.close(); + this.versionNegotiator = null; + this.cacheClient = null; + super.disabled(); Review comment: Should be removed. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.remote.VersionNegotiator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link io.netty.channel.ChannelHandler} responsible for performing the client handshake with the + * distributed cache server. + */ +public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * The header bytes used to initiate the server handshake. + */ + private static final byte[] MAGIC_HEADER = new byte[]{'N', 'i', 'F', 'i'}; + + /** + * The synchronization construct used to signal the client application that the handshake has finished. + */ + private final ChannelPromise promiseHandshakeComplete; + + /** + * The version of the protocol negotiated between the client and server. + */ + private final AtomicInteger protocol; + + /** + * The coordinator used to broker the version of the distributed cache protocol with the service. + */ + private final VersionNegotiator versionNegotiator; + + /** + * Constructor. + * + * @param channel the channel to which this {@link io.netty.channel.ChannelHandler} is bound. + * @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service + */ + public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator) { + this.promiseHandshakeComplete = channel.newPromise(); + this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED); + this.versionNegotiator = versionNegotiator; + } + + /** + * API providing client application with visibility into the handshake process. Distributed cache requests + * should not be sent using this {@link Channel} until the handshake is complete. + */ + public void waitHandshakeComplete() { + promiseHandshakeComplete.awaitUninterruptibly(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws IOException { + final ByteBuf byteBufMagic = Unpooled.wrappedBuffer(MAGIC_HEADER); + ctx.write(byteBufMagic); + logger.debug("Magic header written"); + final int currentVersion = versionNegotiator.getVersion(); + final ByteBuf byteBufVersion = Unpooled.wrappedBuffer(new OutboundAdapter().write(currentVersion).toBytes()); + ctx.writeAndFlush(byteBufVersion); + logger.debug("Protocol version {} proposed", versionNegotiator.getVersion()); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (promiseHandshakeComplete.isSuccess()) { + ctx.fireChannelRead(msg); + } else { + final ByteBuf byteBuf = (ByteBuf) msg; + try { + processHandshake(ctx, byteBuf); + } catch (IOException | HandshakeException e) { + throw new IllegalStateException("Handshake Processing Failed", e); + } finally { + byteBuf.release(); + } + } + } + + /** + * Negotiate distributed cache protocol version with remote service. + * + * @param ctx the {@link Channel} context + * @param byteBuf the byte stream received from the remote service + * @throws HandshakeException on failure to negotiate protocol version + * @throws IOException on write failure + */ + private void processHandshake(final ChannelHandlerContext ctx, final ByteBuf byteBuf) throws HandshakeException, IOException { + final short statusCode = byteBuf.readUnsignedByte(); + if (statusCode == ProtocolHandshake.RESOURCE_OK) { + logger.debug("Protocol version {} accepted", versionNegotiator.getVersion()); + protocol.set(versionNegotiator.getVersion()); + } else if (statusCode == ProtocolHandshake.DIFFERENT_RESOURCE_VERSION) { + final int newVersion = byteBuf.readInt(); + logger.debug("Protocol version {} counter proposal", newVersion); + final Integer newPreference = versionNegotiator.getPreferredVersion(newVersion); + Optional.ofNullable(newPreference).orElseThrow(() -> new HandshakeException("Could not agree on protocol version")); Review comment: It would be helpful to include the received version in the message: ```suggestion Optional.ofNullable(newPreference).orElseThrow(() -> new HandshakeException(String.format("Received unsupported protocol version proposal [%d]", newVersion))); ``` ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; +import org.apache.nifi.remote.VersionNegotiator; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +/** + * Bootstrap a new netty connection. This performs the socket handshake used by the nifi distributed set / + * distributed map client controller services. + */ +public class CacheClientChannelInitializer extends ChannelInitializer<Channel> { + + /** + * The (optional) secure context associated with the channel. If null, channel is not SSL protected. + */ + private final SSLContext sslContext; + + /** + * The version of the protocol negotiated between the client and server. + */ + private final VersionNegotiator versionNegotiator; Review comment: As a stateful object, is it safe to reuse this instance across channels? It seems like a new instance should be created in `initChannel`. ########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ########## @@ -99,411 +101,140 @@ } @OnEnabled - public void cacheConfig(final ConfigurationContext context) { - this.configContext = context; + public void onEnabled(final ConfigurationContext context) { + super.enabled(); + getLogger().debug("Enabling Map Cache Client Service [{}]", context.getName()); + this.versionNegotiator = new StandardVersionNegotiator( + ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), ProtocolVersion.V1.value()); + this.cacheClient = new NettyDistributedMapCacheClient(context, versionNegotiator); + } + + @OnDisabled + public void onDisabled() throws IOException { + getLogger().debug("Disabling Map Cache Client Service"); + this.cacheClient.close(); + this.versionNegotiator = null; + this.cacheClient = null; + super.disabled(); } @OnStopped public void onStopped() throws IOException { - close(); + if (isEnabled()) { + onDisabled(); + } } @Override public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("putIfAbsent"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - - dos.flush(); - - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final byte[] bytesValue = CacheClientSerde.serialize(value, valueSerializer); + return cacheClient.putIfAbsent(bytesKey, bytesValue); } @Override public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - withCommsSession(new CommsAction<Object>() { - @Override - public Object execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("put"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - - dos.flush(); - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final boolean success = dis.readBoolean(); - if ( !success ) { - throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response"); - } - - return null; - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final byte[] bytesValue = CacheClientSerde.serialize(value, valueSerializer); + cacheClient.putIfAbsent(bytesKey, bytesValue); } @Override public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("containsKey"); - - serialize(key, keySerializer, dos); - dos.flush(); - - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + return cacheClient.containsKey(bytesKey); } @Override public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<V>() { - @Override - public V execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("getAndPutIfAbsent"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - return valueDeserializer.deserialize(responseBuffer); - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final byte[] bytesValue = CacheClientSerde.serialize(value, valueSerializer); + final ValueInboundAdapter<V> inboundAdapter = new ValueInboundAdapter<>(valueDeserializer); + return cacheClient.getAndPutIfAbsent(bytesKey, bytesValue, inboundAdapter); } @Override public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<V>() { - @Override - public V execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("get"); - - serialize(key, keySerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - return valueDeserializer.deserialize(responseBuffer); - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final ValueInboundAdapter<V> inboundAdapter = new ValueInboundAdapter<>(valueDeserializer); + return cacheClient.get(bytesKey, inboundAdapter); } @Override public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(session -> { - Map<K, V> response = new HashMap<>(keys.size()); - try { - validateProtocolVersion(session, 3); - - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("subMap"); - serialize(keys, keySerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - - for (K key : keys) { - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - response.put(key, valueDeserializer.deserialize(responseBuffer)); - } - } catch (UnsupportedOperationException uoe) { - // If the server doesn't support subMap, just emulate it with multiple calls to get() - for (K key : keys) { - response.put(key, get(key, keySerializer, valueDeserializer)); - } - } - - return response; - }); + validateProtocolVersion(ProtocolVersion.V3.value()); + Collection<byte[]> bytesKeys = CacheClientSerde.serialize(keys, keySerializer); + final MapValuesInboundAdapter<K, V> inboundAdapter = + new MapValuesInboundAdapter<>(keys, valueDeserializer, new HashMap<>()); + return cacheClient.subMap(bytesKeys, inboundAdapter); } @Override public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { - return withCommsSession(new CommsAction<Boolean>() { - @Override - public Boolean execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("remove"); - - serialize(key, serializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - } - }); + final byte[] bytesKey = CacheClientSerde.serialize(key, serializer); + return cacheClient.remove(bytesKey); } @Override public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<V>() { - @Override - public V execute(final CommsSession session) throws IOException { - validateProtocolVersion(session, 3); - - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("removeAndGet"); - - serialize(key, keySerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - return valueDeserializer.deserialize(responseBuffer); - } - }); + validateProtocolVersion(ProtocolVersion.V3.value()); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final ValueInboundAdapter<V> inboundAdapter = new ValueInboundAdapter<>(valueDeserializer); + return cacheClient.removeAndGet(bytesKey, inboundAdapter); } @Override public long removeByPattern(String regex) throws IOException { - return withCommsSession(session -> { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("removeByPattern"); - dos.writeUTF(regex); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readLong(); - }); + return cacheClient.removeByPattern(regex); } @Override - public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(new CommsAction<Map<K, V>>() { - @Override - public Map<K, V> execute(CommsSession session) throws IOException { - validateProtocolVersion(session, 3); - - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("removeByPatternAndGet"); - dos.writeUTF(regex); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final int mapSize = dis.readInt(); - HashMap<K, V> resultMap = new HashMap<>(mapSize); - for (int i=0; i<mapSize; i++) { - final byte[] keyBuffer = readLengthDelimitedResponse(dis); - K key = keyDeserializer.deserialize(keyBuffer); - final byte[] valueBuffer = readLengthDelimitedResponse(dis); - V value = valueDeserializer.deserialize(valueBuffer); - resultMap.put(key, value); - } - return resultMap; - } - }); + public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) throws IOException { + validateProtocolVersion(ProtocolVersion.V3.value()); + final MapInboundAdapter<K, V> inboundAdapter = + new MapInboundAdapter<>(keyDeserializer, valueDeserializer, new HashMap<>()); + return cacheClient.removeByPatternAndGet(regex, inboundAdapter); } @Override - @SuppressWarnings("unchecked") - public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { - return withCommsSession(session -> { - validateProtocolVersion(session, 2); - - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("fetch"); - - serialize(key, keySerializer, dos); - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final long revision = dis.readLong(); - final byte[] responseBuffer = readLengthDelimitedResponse(dis); - - if (revision < 0) { - // This indicates that key was not found. - return null; - } - - return new AtomicCacheEntry(key, valueDeserializer.deserialize(responseBuffer), revision); - }); + public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, + final Deserializer<V> valueDeserializer) throws IOException { + validateProtocolVersion(ProtocolVersion.V2.value()); + final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer); + final AtomicCacheEntryInboundAdapter<K, V> inboundAdapter = + new AtomicCacheEntryInboundAdapter<>(key, valueDeserializer); + return cacheClient.fetch(bytesKey, inboundAdapter); } - private void validateProtocolVersion(final CommsSession session, final int requiredProtocolVersion) { - if (session.getProtocolVersion() < requiredProtocolVersion) { + private void validateProtocolVersion(final int requiredProtocolVersion) { + if (versionNegotiator.getVersion() < requiredProtocolVersion) { throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + requiredProtocolVersion); } } @Override public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { - return withCommsSession(session -> { - validateProtocolVersion(session, 2); - - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("replace"); - - serialize(entry.getKey(), keySerializer, dos); - dos.writeLong(entry.getRevision().orElse(0L)); - serialize(entry.getValue(), valueSerializer, dos); - - dos.flush(); - - // read response - final DataInputStream dis = new DataInputStream(session.getInputStream()); - return dis.readBoolean(); - }); + validateProtocolVersion(ProtocolVersion.V2.value()); + final byte[] bytesKey = CacheClientSerde.serialize(entry.getKey(), keySerializer); + final byte[] bytesValue = CacheClientSerde.serialize(entry.getValue(), valueSerializer); + final long revision = entry.getRevision().orElse(DEFAULT_CACHE_REVISION); + return cacheClient.replace(bytesKey, bytesValue, revision); } + private static final long DEFAULT_CACHE_REVISION = 0L; Review comment: This declaration should be moved to the beginning of the class, before constructors and methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
