This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6351e1957ff6ac39e0e17c899fd0f82ec4ab600f Author: kaiyi.lk <[email protected]> AuthorDate: Fri Nov 11 17:53:00 2022 +0800 [ISSUE #5485] polish channel management which is been synced from other proxy --- .../proxy/grpc/v2/DefaultGrpcMessingActivity.java | 1 + .../proxy/grpc/v2/client/ClientActivity.java | 30 +++----------- .../grpc/v2/common/GrpcClientSettingsManager.java | 48 +++++++++++++++++++++- .../proxy/processor/ReceiptHandleProcessor.java | 1 + .../proxy/processor/channel/RemoteChannel.java | 18 ++++++++ .../remoting/activity/ClientManagerActivity.java | 1 + .../proxy/remoting/channel/RemotingChannel.java | 12 ++++++ .../proxy/service/sysmessage/HeartbeatSyncer.java | 16 ++++---- .../service/sysmessage/HeartbeatSyncerData.java | 30 ++++++++++---- 9 files changed, 113 insertions(+), 44 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index f30519d74..194b9204f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -91,6 +91,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.appendStartAndShutdown(this.receiptHandleProcessor); + this.appendStartAndShutdown(this.grpcClientSettingsManager); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 00cc862a4..de8fba4a6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; -import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerChangeListener; import org.apache.rocketmq.broker.client.ProducerGroupEvent; @@ -436,32 +435,12 @@ public class ClientActivity extends AbstractMessingActivity { } if (args[0] instanceof ClientChannelInfo) { ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - String clientId = clientChannelInfo.getClientId(); if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { - grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> { - if (grpcChannelManager.getChannel(clientId) == null) { - // if there is no channel connect directly to this proxy - return null; - } - return orgSettings; - }); - } else { - grpcChannelManager.removeChannel(clientId); - grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> { - ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group); - if (consumerGroupInfo == null) { - return null; - } - List<Channel> allChannels = consumerGroupInfo.getAllChannel(); - if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) { - // if there is only one channel of this clientId or - // there is no channel if this clientId - // remove settings of this client - return null; - } - return orgSettings; - }); + return; } + GrpcClientChannel removedChannel = grpcChannelManager.removeChannel(clientChannelInfo.getClientId()); + log.info("remove grpc channel when client unregister. group:{}, clientChannelInfo:{}, removed:{}", + group, clientChannelInfo, removedChannel != null); } } @@ -475,6 +454,7 @@ public class ClientActivity extends AbstractMessingActivity { if (ChannelHelper.isRemote(channel)) { // save settings from channel sync from other proxy Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel); + log.debug("save client settings sync from other proxy. group:{}, channelInfo:{}, settings:{}", group, clientChannelInfo, settings); if (settings == null) { return; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index 21c3395ae..b5b82fbdc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.grpc.v2.common; import apache.rocketmq.v2.Address; import apache.rocketmq.v2.AddressScheme; +import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.CustomizedBackoff; import apache.rocketmq.v2.Endpoints; import apache.rocketmq.v2.ExponentialBackoff; @@ -29,10 +30,18 @@ import com.google.protobuf.util.Durations; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.MetricCollectorMode; import org.apache.rocketmq.proxy.config.ProxyConfig; @@ -43,8 +52,8 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; -public class GrpcClientSettingsManager { - +public class GrpcClientSettingsManager extends ServiceThread implements StartAndShutdown { + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected static final Map<String, Settings> CLIENT_SETTINGS_MAP = new ConcurrentHashMap<>(); private final MessagingProcessor messagingProcessor; @@ -205,4 +214,39 @@ public class GrpcClientSettingsManager { } return mergeMetric(settings); } + + @Override + public String getServiceName() { + return "GrpcClientSettingsManagerCleaner"; + } + + @Override + public void run() { + while (!this.isStopped()) { + this.waitForRunning(TimeUnit.SECONDS.toMillis(5)); + } + } + + @Override + protected void onWaitEnd() { + Set<String> clientIdSet = CLIENT_SETTINGS_MAP.keySet(); + for (String clientId : clientIdSet) { + try { + CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> { + if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) { + return settings; + } + String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); + ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(consumerGroup); + if (consumerGroupInfo == null || consumerGroupInfo.findChannel(clientId) == null) { + log.info("remove unused grpc client settings. group:{}, settings:{}", consumerGroupInfo, settings); + return null; + } + return settings; + }); + } catch (Throwable t) { + log.error("check expired grpc client settings failed. clientId:{}", clientId, t); + } + } + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 9d000bfe9..5e096bc6b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -111,6 +111,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { return; } clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); + log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java index 5d9c9afcc..fb9666afc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.processor.channel; +import com.google.common.base.MoreObjects; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; @@ -57,6 +58,13 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu public int compareTo(ChannelId o) { return this.id.compareTo(o.asLongText()); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .toString(); + } } @Override @@ -95,4 +103,14 @@ public class RemoteChannel extends SimpleChannel implements ChannelExtendAttribu public String getChannelExtendAttribute() { return this.extendAttribute; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("channelId", id()) + .add("type", type) + .add("remoteProxyIp", remoteProxyIp) + .add("extendAttribute", extendAttribute) + .toString(); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java index 62400e033..10f7fa324 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java @@ -156,6 +156,7 @@ public class ClientManagerActivity extends AbstractRemotingActivity { if (args[0] instanceof ClientChannelInfo) { ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel()); + log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java index 2b2cfca79..8b4832cad 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.remoting.channel; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; +import com.google.common.base.MoreObjects; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import java.time.Duration; @@ -221,4 +222,15 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver ChannelProtocolType.REMOTING, this.getChannelExtendAttribute()); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("parent", parent()) + .add("clientId", clientId) + .add("remoteAddress", remoteAddress) + .add("localAddress", localAddress) + .add("subscriptionData", subscriptionData) + .toString(); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java index ce3403766..12504a2f0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -119,11 +119,11 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { messageModel, consumeFromWhere, proxyConfig.getLocalServeAddr(), - remoteChannel.encode(), - remoteChannel.getChannelExtendAttribute() + remoteChannel.encode() ); data.setSubscriptionDataSet(subList); + log.debug("sync register heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data); this.sendSystemMessage(data); } catch (Throwable t) { log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}", @@ -158,10 +158,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { null, null, proxyConfig.getLocalServeAddr(), - remoteChannel.encode(), - remoteChannel.getChannelExtendAttribute() + remoteChannel.encode() ); + log.debug("sync unregister heart beat. topic:{}, data:{}", this.getBroadcastTopicName(), data); this.sendSystemMessage(data); } catch (Throwable t) { log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}", @@ -187,16 +187,16 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { continue; } - RemoteChannel channel = RemoteChannel.decode(data.getChannelData()); - RemoteChannel finalChannel = channel; - channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel); - channel.setExtendAttribute(data.getChannelExtendAttribute()); + RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData()); + RemoteChannel channel = remoteChannelMap.computeIfAbsent(decodedChannel.id().asLongText(), key -> decodedChannel); + channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute()); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( channel, data.getClientId(), data.getLanguage(), data.getVersion() ); + log.debug("start process remote channel. data:{}, clientChannelInfo:{}", data, clientChannelInfo); if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) { this.consumerManager.registerConsumer( data.getGroup(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java index 20fee7aac..f3b96ac9a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.service.sysmessage; +import com.google.common.base.MoreObjects; import java.util.Set; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -37,13 +38,15 @@ public class HeartbeatSyncerData { private ConsumeFromWhere consumeFromWhere; private String connectProxyIp; private String channelData; - private String channelExtendAttribute; + + public HeartbeatSyncerData() { + } public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId, LanguageCode language, int version, String group, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, String connectProxyIp, - String channelData, String channelExtendAttribute) { + String channelData) { this.heartbeatType = heartbeatType; this.clientId = clientId; this.language = language; @@ -54,7 +57,6 @@ public class HeartbeatSyncerData { this.consumeFromWhere = consumeFromWhere; this.connectProxyIp = connectProxyIp; this.channelData = channelData; - this.channelExtendAttribute = channelExtendAttribute; } public HeartbeatType getHeartbeatType() { @@ -154,11 +156,21 @@ public class HeartbeatSyncerData { this.channelData = channelData; } - public String getChannelExtendAttribute() { - return channelExtendAttribute; - } - - public void setChannelExtendAttribute(String channelExtendAttribute) { - this.channelExtendAttribute = channelExtendAttribute; + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("heartbeatType", heartbeatType) + .add("clientId", clientId) + .add("language", language) + .add("version", version) + .add("lastUpdateTimestamp", lastUpdateTimestamp) + .add("subscriptionDataSet", subscriptionDataSet) + .add("group", group) + .add("consumeType", consumeType) + .add("messageModel", messageModel) + .add("consumeFromWhere", consumeFromWhere) + .add("connectProxyIp", connectProxyIp) + .add("channelData", channelData) + .toString(); } }
