greyp9 commented on a change in pull request #5311: URL: https://github.com/apache/nifi/pull/5311#discussion_r711318485
########## File path: nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter; + +import java.io.IOException; + +/** + * The {@link io.netty.channel.ChannelHandler} responsible for sending client requests and receiving server responses + * in the context of a distributed cache server. + */ +public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter { + + /** + * The object used to buffer and interpret the service response byte stream. + */ + private InboundAdapter inboundAdapter; + + /** + * The synchronization construct used to signal the client application that the server response has been received. + */ + private ChannelPromise channelPromise; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + final ByteBuf byteBuf = (ByteBuf) msg; + try { + final byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + inboundAdapter.queue(bytes); + } finally { + byteBuf.release(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { + inboundAdapter.dequeue(); + if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) { + channelPromise.setSuccess(); + } + } + + /** + * Perform a synchronous method call to the server. The server is expected to write + * a byte stream response to the channel, which may be deserialized into a Java object + * by the caller. + * + * @param channel the network channel used to make the request + * @param message the request payload, which might be a method name, and [0..n] concatenated arguments + * @param inboundAdapter the business logic to deserialize the server response + */ + public void invoke(final Channel channel, final byte[] message, final InboundAdapter inboundAdapter) { + final CacheClientHandshakeHandler handshakeHandler = channel.pipeline().get(CacheClientHandshakeHandler.class); + handshakeHandler.waitHandshakeComplete(); + this.inboundAdapter = inboundAdapter; Review comment: This should be ok. Each channel has its own copy of `HandshakeHandler` and `RequestHandler`. There should be at most one instance of `invoke()` at a time associated with a given channel (this is managed at `DistributedCacheClient.invoke()`. I've added a line at the end of this method to clarify that; also a new class `NullInboundAdaptor` to handle out-of-band data reads (which should never happen). `InboundAdapter` understands how to interpret the response to a particular API invocation (`putIfAbsent()` returns a boolean, while `getAndPutIfAbsent()` returns an class instance). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
