This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 4a1eea7 [STORM-3763] send initial message to remote client only after
authentication completes (#3390)
4a1eea7 is described below
commit 4a1eea700766da2f175ac7eaba6064f0d7f0ff03
Author: Meng (Ethan) Li <[email protected]>
AuthorDate: Thu Apr 1 11:19:52 2021 -0500
[STORM-3763] send initial message to remote client only after
authentication completes (#3390)
---
.../src/jvm/org/apache/storm/messaging/IContext.java | 3 ++-
.../jvm/org/apache/storm/messaging/netty/Server.java | 18 +++++++++++++++---
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8b1183b..58b0975 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -55,7 +55,8 @@ public interface IContext {
* @param stormId topology ID
* @param port port #
* @param cb The callback to deliver received messages to
- * @param newConnectionResponse Supplier of the initial message to send to
new client connections
+ * @param newConnectionResponse Supplier of the initial message to send to
new client connections. If authentication
+ * is required, the message will be sent
after authentication is complete.
* @return server side connection
*/
IConnection bind(String stormId, int port, IConnectionCallback cb,
Supplier<Object> newConnectionResponse);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 94c5d75..0e99257 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -65,16 +65,19 @@ class Server extends ConnectionWithStatus implements
IStatefulObject, ISaslServe
private final IConnectionCallback cb;
private final Supplier<Object> newConnectionResponse;
private volatile boolean closing = false;
+ private final boolean isNettyAuthRequired;
/**
* Starts Netty at the given port.
* @param topoConf The topology config
* @param port The port to start Netty at
* @param cb The callback to deliver incoming messages to
- * @param newConnectionResponse The response to send to clients when they
connect. Can be null.
+ * @param newConnectionResponse The response to send to clients when they
connect. Can be null. If authentication
+ * is required, the message will be sent
after authentication is complete.
*/
Server(Map<String, Object> topoConf, int port, IConnectionCallback cb,
Supplier<Object> newConnectionResponse) {
this.topoConf = topoConf;
+ this.isNettyAuthRequired = (Boolean)
topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
this.port = port;
ser = new KryoValuesSerializer(topoConf);
this.cb = cb;
@@ -252,8 +255,9 @@ class Server extends ConnectionWithStatus implements
IStatefulObject, ISaslServe
**/
@Override
public void channelActive(Channel c) {
- if (newConnectionResponse != null) {
- c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ if (!isNettyAuthRequired) {
+ //if authentication is not required, treat it as authenticated.
+ authenticated(c);
}
allChannels.add(c);
}
@@ -285,6 +289,14 @@ class Server extends ConnectionWithStatus implements
IStatefulObject, ISaslServe
@Override
public void authenticated(Channel c) {
+ if (isNettyAuthRequired) {
+ LOG.debug("The channel {} is active and authenticated", c);
+ } else {
+ LOG.debug("The channel {} is active", c);
+ }
+ if (newConnectionResponse != null) {
+ c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ }
}
@Override