This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2043dd5034 [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE
(#7494)
2043dd5034 is described below
commit 2043dd50341e0a4a2f254d72aa3109f4dfc97aac
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Oct 24 10:29:43 2023 +0800
[ISSUE #7493] Introduce a new event NettyEventType.ACTIVE (#7494)
* [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for
ChannelEventListener
* introduce a new event NettyEventType.ACTIVE,
* implement channelActive interface for
NettyRemotingClient#NettyConnectManageHandler
* add onChannelActive for ChannelEventListener interface.
* Move send heartbeat to onChannelActive
---
.../broker/client/ClientHousekeepingService.java | 5 +++++
.../client/impl/factory/MQClientInstance.java | 20 ++++++++++++--------
.../ContainerClientHouseKeepingService.java | 11 ++++++++++-
.../controller/BrokerHousekeepingService.java | 5 +++++
.../namesrv/routeinfo/BrokerHousekeepingService.java | 5 +++++
.../proxy/remoting/ClientHousekeepingService.java | 4 ++++
.../rocketmq/remoting/ChannelEventListener.java | 2 ++
.../rocketmq/remoting/netty/NettyEventType.java | 3 ++-
.../remoting/netty/NettyRemotingAbstract.java | 3 +++
.../rocketmq/remoting/netty/NettyRemotingClient.java | 11 +++++++++++
10 files changed, 59 insertions(+), 10 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index cbb81f632b..7878d0eec5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -87,4 +87,9 @@ public class ClientHousekeepingService implements
ChannelEventListener {
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr,
channel);
this.brokerController.getBrokerStatsManager().incChannelIdleNum();
}
+
+ @Override
+ public void onChannelActive(String remoteAddr, Channel channel) {
+
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 09534a1768..ba72a6dce7 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -159,14 +159,6 @@ public class MQClientInstance {
private final ConcurrentMap<String, HashMap<Long, String>>
brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override
public void onChannelConnect(String remoteAddr, Channel
channel) {
- for (Map.Entry<String, HashMap<Long, String>> addressEntry
: brokerAddrTable.entrySet()) {
- for (String address :
addressEntry.getValue().values()) {
- if (address.equals(remoteAddr)) {
- sendHeartbeatToAllBrokerWithLockV2(false);
- break;
- }
- }
- }
}
@Override
@@ -180,6 +172,18 @@ public class MQClientInstance {
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
}
+
+ @Override
+ public void onChannelActive(String remoteAddr, Channel
channel) {
+ for (Map.Entry<String, HashMap<Long, String>> addressEntry
: brokerAddrTable.entrySet()) {
+ for (String address :
addressEntry.getValue().values()) {
+ if (address.equals(remoteAddr)) {
+ sendHeartbeatToAllBrokerWithLockV2(false);
+ break;
+ }
+ }
+ }
+ }
};
} else {
channelEventListener = null;
diff --git
a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
index 8bf4b4a33d..90c912247e 100644
---
a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
+++
b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
@@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements
ChannelEventListener
onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
}
+ @Override
+ public void onChannelActive(String remoteAddr, Channel channel) {
+ onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel);
+ }
+
private void onChannelOperation(CallbackCode callbackCode, String
remoteAddr, Channel channel) {
Collection<InnerBrokerController> masterBrokers =
this.brokerContainer.getMasterBrokers();
Collection<InnerSalveBrokerController> slaveBrokers =
this.brokerContainer.getSlaveBrokers();
@@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements
ChannelEventListener
/**
* onChannelIdle
*/
- IDLE
+ IDLE,
+ /**
+ * onChannelActive
+ */
+ ACTIVE
}
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
index 652a9eeb0d..d22d0b6069 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
@@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements
ChannelEventListener {
public void onChannelIdle(String remoteAddr, Channel channel) {
this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
}
+
+ @Override
+ public void onChannelActive(String remoteAddr, Channel channel) {
+
+ }
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
index 80d9939923..b527429f77 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
@@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements
ChannelEventListener {
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(channel);
}
+
+ @Override
+ public void onChannelActive(String remoteAddr, Channel channel) {
+
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
index e213ae8554..74eb6f2db2 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
@@ -49,5 +49,9 @@ public class ClientHousekeepingService implements
ChannelEventListener {
this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
}
+ @Override
+ public void onChannelActive(String remoteAddr, Channel channel) {
+
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
index c99133b3a2..6802e69b90 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
@@ -26,4 +26,6 @@ public interface ChannelEventListener {
void onChannelException(final String remoteAddr, final Channel channel);
void onChannelIdle(final String remoteAddr, final Channel channel);
+
+ void onChannelActive(final String remoteAddr, final Channel channel);
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
index 9ac944aad3..4bc9d57dda 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
@@ -20,5 +20,6 @@ public enum NettyEventType {
CONNECT,
CLOSE,
IDLE,
- EXCEPTION
+ EXCEPTION,
+ ACTIVE
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 07ace28ea5..62a8a72901 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract {
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
+ case ACTIVE:
+
listener.onChannelActive(event.getRemoteAddr(), event.getChannel());
+ break;
default:
break;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 340daee67e..9f15191306 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
}
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
+ super.channelActive(ctx);
+
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this.putNettyEvent(new
NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel()));
+ }
+ }
+
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise
promise) throws Exception {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());