greyp9 commented on a change in pull request #5311:
URL: https://github.com/apache/nifi/pull/5311#discussion_r711318485



##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
+
+import java.io.IOException;
+
+/**
+ * The {@link io.netty.channel.ChannelHandler} responsible for sending client 
requests and receiving server responses
+ * in the context of a distributed cache server.
+ */
+public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
+
+    /**
+     * The object used to buffer and interpret the service response byte 
stream.
+     */
+    private InboundAdapter inboundAdapter;
+
+    /**
+     * The synchronization construct used to signal the client application 
that the server response has been received.
+     */
+    private ChannelPromise channelPromise;
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
+        final ByteBuf byteBuf = (ByteBuf) msg;
+        try {
+            final byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            inboundAdapter.queue(bytes);
+        } finally {
+            byteBuf.release();
+        }
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
IOException {
+        inboundAdapter.dequeue();
+        if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) {
+            channelPromise.setSuccess();
+        }
+    }
+
+    /**
+     * Perform a synchronous method call to the server.  The server is 
expected to write
+     * a byte stream response to the channel, which may be deserialized into a 
Java object
+     * by the caller.
+     *
+     * @param channel        the network channel used to make the request
+     * @param message        the request payload, which might be a method 
name, and [0..n] concatenated arguments
+     * @param inboundAdapter the business logic to deserialize the server 
response
+     */
+    public void invoke(final Channel channel, final byte[] message, final 
InboundAdapter inboundAdapter) {
+        final CacheClientHandshakeHandler handshakeHandler = 
channel.pipeline().get(CacheClientHandshakeHandler.class);
+        handshakeHandler.waitHandshakeComplete();
+        this.inboundAdapter = inboundAdapter;

Review comment:
       This should be ok.  Each channel has its own copy of `HandshakeHandler` 
and `RequestHandler`.  There should be at most one instance of `invoke()` at a 
time associated with a given channel (this is managed at 
`DistributedCacheClient.invoke()`.  I've added a line at the end of this method 
to clarify that; also a new class `NullInboundAdaptor` to handle out-of-band 
data reads (which should never happen).
   
   `InboundAdapter` understands how to interpret the response to a particular 
API invocation (`putIfAbsent()` returns a boolean, while `getAndPutIfAbsent()` 
returns an class instance).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to