dongeforever commented on code in PR #4446:
URL: https://github.com/apache/rocketmq/pull/4446#discussion_r895368602


##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java:
##########
@@ -166,22 +173,50 @@ public void processMessageReceived(ChannelHandlerContext 
ctx, RemotingCommand ms
         }
     }
 
-    protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
-        if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
-                rpcHook.doBeforeRequest(addr, request);
+    protected Decision preHandle(final HandlerContext context, final 
RemotingCommand request,
+                                 final CompletableFuture<RemotingCommand> 
responseFuture) {
+        Decision decision = Decision.CONTINUE;
+        for (Handler handler : handlers) {
+            try {
+                decision = handler.preHandle(context, request, responseFuture);
+            } catch (Throwable e) {
+                RemotingCommand response  = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+                    e.getMessage());
+                response.setOpaque(request.getOpaque());
+                responseFuture.complete(response);
+                decision = Decision.STOP;
+            }
+
+            if (Decision.STOP == decision) {
+                break;
             }
         }
+        return decision;
     }
 
-    protected void doAfterRpcHooks(String addr, RemotingCommand request, 
RemotingCommand response) {
-        if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
-                rpcHook.doAfterResponse(addr, request, response);
+    protected Decision postHandle(final HandlerContext context, final 
RemotingCommand request, final RemotingCommand response) {
+        Decision decision = Decision.CONTINUE;
+        for (Handler handler : handlers) {
+            try {
+                decision = handler.postHandle(context, request, response);
+            } catch (Throwable ignore) {
+            }
+            if (Decision.STOP == decision) {
+                break;
             }
         }
+        return decision;
     }
 
+    public void registerRPCHook(RPCHook rpcHook) {
+        if (null != rpcHook) {
+            handlers.add(new HandlerAdaptor(rpcHook));
+        }

Review Comment:
   If the RPCHook is not exposed to the end-user, this Adapter is great.
   
   But currently, the widely used DefaultMQProduer or  DefaultMQConsumer, or  
DefaultAdminExt have the access to RPCHook. 
   It is better to come up with other way to solve it.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java:
##########
@@ -166,22 +173,50 @@ public void processMessageReceived(ChannelHandlerContext 
ctx, RemotingCommand ms
         }
     }
 
-    protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
-        if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
-                rpcHook.doBeforeRequest(addr, request);
+    protected Decision preHandle(final HandlerContext context, final 
RemotingCommand request,
+                                 final CompletableFuture<RemotingCommand> 
responseFuture) {
+        Decision decision = Decision.CONTINUE;
+        for (Handler handler : handlers) {
+            try {
+                decision = handler.preHandle(context, request, responseFuture);
+            } catch (Throwable e) {
+                RemotingCommand response  = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+                    e.getMessage());
+                response.setOpaque(request.getOpaque());
+                responseFuture.complete(response);
+                decision = Decision.STOP;
+            }
+
+            if (Decision.STOP == decision) {
+                break;
             }
         }
+        return decision;
     }
 
-    protected void doAfterRpcHooks(String addr, RemotingCommand request, 
RemotingCommand response) {
-        if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
-                rpcHook.doAfterResponse(addr, request, response);
+    protected Decision postHandle(final HandlerContext context, final 
RemotingCommand request, final RemotingCommand response) {
+        Decision decision = Decision.CONTINUE;
+        for (Handler handler : handlers) {
+            try {
+                decision = handler.postHandle(context, request, response);
+            } catch (Throwable ignore) {
+            }
+            if (Decision.STOP == decision) {
+                break;
             }
         }
+        return decision;
     }
 
+    public void registerRPCHook(RPCHook rpcHook) {
+        if (null != rpcHook) {
+            handlers.add(new HandlerAdaptor(rpcHook));
+        }

Review Comment:
   The flow-control is very useful, So why not apply the ability to the 
RPCHook, which could benefit the users without adding a new interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to