This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a1eea7  [STORM-3763] send initial message to remote client only after 
authentication completes (#3390)
4a1eea7 is described below

commit 4a1eea700766da2f175ac7eaba6064f0d7f0ff03
Author: Meng (Ethan) Li <[email protected]>
AuthorDate: Thu Apr 1 11:19:52 2021 -0500

    [STORM-3763] send initial message to remote client only after 
authentication completes (#3390)
---
 .../src/jvm/org/apache/storm/messaging/IContext.java   |  3 ++-
 .../jvm/org/apache/storm/messaging/netty/Server.java   | 18 +++++++++++++++---
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java 
b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8b1183b..58b0975 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -55,7 +55,8 @@ public interface IContext {
      * @param stormId topology ID
      * @param port     port #
      * @param cb The callback to deliver received messages to
-     * @param newConnectionResponse Supplier of the initial message to send to 
new client connections
+     * @param newConnectionResponse Supplier of the initial message to send to 
new client connections. If authentication
+     *                              is required, the message will be sent 
after authentication is complete.
      * @return server side connection
      */
     IConnection bind(String stormId, int port, IConnectionCallback cb, 
Supplier<Object> newConnectionResponse);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 94c5d75..0e99257 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -65,16 +65,19 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
     private final IConnectionCallback cb;
     private final Supplier<Object> newConnectionResponse;
     private volatile boolean closing = false;
+    private final boolean isNettyAuthRequired;
 
     /**
      * Starts Netty at the given port.
      * @param topoConf The topology config
      * @param port The port to start Netty at
      * @param cb The callback to deliver incoming messages to
-     * @param newConnectionResponse The response to send to clients when they 
connect. Can be null.
+     * @param newConnectionResponse The response to send to clients when they 
connect. Can be null. If authentication
+     *                              is required, the message will be sent 
after authentication is complete.
      */
     Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, 
Supplier<Object> newConnectionResponse) {
         this.topoConf = topoConf;
+        this.isNettyAuthRequired = (Boolean) 
topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         this.port = port;
         ser = new KryoValuesSerializer(topoConf);
         this.cb = cb;
@@ -252,8 +255,9 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
      **/
     @Override
     public void channelActive(Channel c) {
-        if (newConnectionResponse != null) {
-            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+        if (!isNettyAuthRequired) {
+            //if authentication is not required, treat it as authenticated.
+            authenticated(c);
         }
         allChannels.add(c);
     }
@@ -285,6 +289,14 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
 
     @Override
     public void authenticated(Channel c) {
+        if (isNettyAuthRequired) {
+            LOG.debug("The channel {} is active and authenticated", c);
+        } else {
+            LOG.debug("The channel {} is active", c);
+        }
+        if (newConnectionResponse != null) {
+            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+        }
     }
 
     @Override

Reply via email to