This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 56551b596b3f2f7b3e16e5f271200bceaf28734a Author: kaiyi.lk <[email protected]> AuthorDate: Tue Nov 8 17:18:24 2022 +0800 [ISSUE #5485] client connection management --- .../rocketmq/broker/client/ConsumerManager.java | 13 +- .../broker/client/ConsumerManagerInterface.java | 60 ++++++ .../rocketmq/broker/client/ProducerManager.java | 10 + .../broker/client/ProducerManagerInterface.java | 44 ++++ .../proxy/common/channel/ChannelHelper.java | 49 +++++ .../apache/rocketmq/proxy/config/ProxyConfig.java | 49 +++++ .../proxy/grpc/v2/DefaultGrpcMessingActivity.java | 2 +- .../proxy/grpc/v2/channel/GrpcChannelManager.java | 7 +- .../proxy/grpc/v2/channel/GrpcClientChannel.java | 71 ++++++- .../proxy/grpc/v2/client/ClientActivity.java | 76 ++++++- .../grpc/v2/common/GrpcClientSettingsManager.java | 9 + .../rocketmq/proxy/processor/ClientProcessor.java | 5 + .../proxy/processor/DefaultMessagingProcessor.java | 5 + .../proxy/processor/MessagingProcessor.java | 2 + .../proxy/processor/ReceiptHandleProcessor.java | 5 + .../channel/ChannelExtendAttributeGetter.java | 23 +++ .../processor/channel/ChannelProtocolType.java | 35 ++++ .../proxy/processor/channel/RemoteChannel.java | 98 +++++++++ .../processor/channel/RemoteChannelConverter.java | 23 +++ .../processor/channel/RemoteChannelSerializer.java | 65 ++++++ .../proxy/remoting/ClientHousekeepingService.java | 53 +++++ .../proxy/remoting/RemotingProtocolServer.java | 72 +++++++ .../proxy/remoting/RemotingProxyOutClient.java | 27 +++ .../remoting/activity/ClientManagerActivity.java | 178 ++++++++++++++++ .../proxy/remoting/channel/RemotingChannel.java | 224 +++++++++++++++++++++ .../remoting/channel/RemotingChannelManager.java | 142 +++++++++++++ .../proxy/remoting/common/RemotingConverter.java | 74 +++++++ .../proxy/service/ClusterServiceManager.java | 17 +- .../proxy/service/LocalServiceManager.java | 11 +- .../rocketmq/proxy/service/ServiceManager.java | 11 +- .../rocketmq/proxy/service/admin/AdminService.java | 32 +++ .../proxy/service/admin/DefaultAdminService.java | 142 +++++++++++++ .../proxy/service/channel/SimpleChannel.java | 8 + .../service/client/ClusterConsumerManager.java | 70 +++++++ .../sysmessage/AbstractSystemMessageSyncer.java | 190 +++++++++++++++++ .../proxy/service/sysmessage/HeartbeatSyncer.java | 224 +++++++++++++++++++++ .../service/sysmessage/HeartbeatSyncerData.java | 164 +++++++++++++++ .../proxy/service/sysmessage/HeartbeatType.java | 23 +++ .../mqclient/ProxyClientRemotingProcessorTest.java | 5 +- 39 files changed, 2285 insertions(+), 33 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 5f95ac1af..0582ce75e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -64,6 +64,7 @@ public class ConsumerManager { this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout(); } + @Override public ClientChannelInfo findChannel(final String group, final String clientId) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { @@ -72,6 +73,7 @@ public class ConsumerManager { return null; } + @Override public ClientChannelInfo findChannel(final String group, final Channel channel) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { @@ -80,6 +82,7 @@ public class ConsumerManager { return null; } + @Override public SubscriptionData findSubscriptionData(final String group, final String topic) { return findSubscriptionData(group, topic, true); } @@ -107,6 +110,7 @@ public class ConsumerManager { return this.consumerTable; } + @Override public ConsumerGroupInfo getConsumerGroupInfo(final String group) { return getConsumerGroupInfo(group, false); } @@ -119,6 +123,7 @@ public class ConsumerManager { return consumerGroupInfo; } + @Override public int findSubscriptionDataCount(final String group) { ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); if (consumerGroupInfo != null) { @@ -128,6 +133,7 @@ public class ConsumerManager { return 0; } + @Override public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); @@ -172,6 +178,7 @@ public class ConsumerManager { isNotifyConsumerIdsChangedEnable, true); } + @Override public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) { @@ -202,11 +209,12 @@ public class ConsumerManager { this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start)); } - callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList); + callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, group, subList, clientChannelInfo); return r1 || r2; } + @Override public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); @@ -252,6 +260,7 @@ public class ConsumerManager { } } + @Override public void scanNotActiveChannel() { Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { @@ -286,6 +295,7 @@ public class ConsumerManager { removeExpireConsumerGroupInfo(); } + @Override public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); @@ -300,6 +310,7 @@ public class ConsumerManager { return groups; } + @Override public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) { consumerIdsChangeListenerList.add(listener); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java new file mode 100644 index 000000000..895a2e491 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.client; + +import io.netty.channel.Channel; +import java.util.Set; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; + +public interface ConsumerManagerInterface { + + ClientChannelInfo findChannel(String group, String clientId); + + ClientChannelInfo findChannel(String group, Channel channel); + + SubscriptionData findSubscriptionData(String group, String topic); + + ConsumerGroupInfo getConsumerGroupInfo(String group); + + int findSubscriptionDataCount(String group); + + boolean doChannelCloseEvent(String remoteAddr, Channel channel); + + default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { + return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, + isNotifyConsumerIdsChangedEnable, true); + } + + boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription); + + void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo, + boolean isNotifyConsumerIdsChangedEnable); + + void scanNotActiveChannel(); + + Set<String> queryTopicConsumeByWho(String topic); + + void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener); +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 52d67bf28..047aa8be9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -54,10 +54,12 @@ public class ProducerManager { this.brokerStatsManager = brokerStatsManager; } + @Override public int groupSize() { return this.groupChannelTable.size(); } + @Override public boolean groupOnline(String group) { Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group); return channels != null && !channels.isEmpty(); @@ -67,6 +69,7 @@ public class ProducerManager { return groupChannelTable; } + @Override public ProducerTableInfo getProducerTable() { Map<String, List<ProducerInfo>> map = new HashMap<>(); for (String group : this.groupChannelTable.keySet()) { @@ -94,6 +97,7 @@ public class ProducerManager { return new ProducerTableInfo(map); } + @Override public void scanNotActiveChannel() { Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator(); @@ -129,6 +133,7 @@ public class ProducerManager { } } + @Override public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; if (channel != null) { @@ -160,6 +165,7 @@ public class ProducerManager { return removed; } + @Override public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { ClientChannelInfo clientChannelInfoFound = null; @@ -183,6 +189,7 @@ public class ProducerManager { } } + @Override public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null != channelTable && !channelTable.isEmpty()) { @@ -202,6 +209,7 @@ public class ProducerManager { } } + @Override public Channel getAvailableChannel(String groupId) { if (groupId == null) { return null; @@ -242,6 +250,7 @@ public class ProducerManager { return lastActiveChannel; } + @Override public Channel findChannel(String clientId) { return clientChannelTable.get(clientId); } @@ -257,6 +266,7 @@ public class ProducerManager { } } + @Override public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) { producerChangeListenerList.add(producerChangeListener); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java new file mode 100644 index 000000000..3f2ece7cd --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.client; + +import io.netty.channel.Channel; +import org.apache.rocketmq.common.protocol.body.ProducerTableInfo; + +public interface ProducerManagerInterface { + + int groupSize(); + + boolean groupOnline(String group); + + ProducerTableInfo getProducerTable(); + + void scanNotActiveChannel(); + + boolean doChannelCloseEvent(String remoteAddr, Channel channel); + + void registerProducer(String group, ClientChannelInfo clientChannelInfo); + + void unregisterProducer(String group, ClientChannelInfo clientChannelInfo); + + Channel getAvailableChannel(String groupId); + + Channel findChannel(String clientId); + + void appendProducerChangeListener(ProducerChangeListener producerChangeListener); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java new file mode 100644 index 000000000..dd15c85fb --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/channel/ChannelHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common.channel; + +import io.netty.channel.Channel; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; + +public class ChannelHelper { + + /** + * judge channel is sync from other proxy or not + * + * @param channel channel + * @return true if is sync from other proxy + */ + public static boolean isRemote(Channel channel) { + return channel instanceof RemoteChannel; + } + + public static ChannelProtocolType getChannelProtocolType(Channel channel) { + if (channel instanceof GrpcClientChannel) { + return ChannelProtocolType.GRPC_V2; + } else if (channel instanceof RemotingChannel) { + return ChannelProtocolType.REMOTING; + } else if (channel instanceof RemoteChannel) { + RemoteChannel remoteChannel = (RemoteChannel) channel; + return remoteChannel.getType(); + } + return ChannelProtocolType.UNKNOWN; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index cbedc3c50..0efca05b4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -34,6 +34,8 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.ProxyMode; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; public class ProxyConfig implements ConfigFile { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -55,6 +57,12 @@ public class ProxyConfig implements ConfigFile { private String proxyClusterName = DEFAULT_CLUSTER_NAME; private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName; + private String localServeAddr = ""; + + private String systemTopicClusterName = ""; + private int heartbeatSyncerThreadPoolNums = 4; + private int heartbeatSyncerThreadPoolQueueCapacity = 100; + /** * configuration for ThreadPoolMonitor */ @@ -204,6 +212,15 @@ public class ProxyConfig implements ConfigFile { @Override public void initData() { parseDelayLevel(); + if (StringUtils.isEmpty(localServeAddr)) { + this.localServeAddr = RemotingUtil.getLocalAddress(); + } + if (StringUtils.isBlank(localServeAddr)) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "get local serve ip failed"); + } + if (StringUtils.isBlank(systemTopicClusterName)) { + this.systemTopicClusterName = this.rocketMQClusterName; + } } public int computeDelayLevel(long timeMillis) { @@ -267,6 +284,38 @@ public class ProxyConfig implements ConfigFile { this.proxyName = proxyName; } + public String getLocalServeAddr() { + return localServeAddr; + } + + public void setLocalServeAddr(String localServeAddr) { + this.localServeAddr = localServeAddr; + } + + public String getSystemTopicClusterName() { + return systemTopicClusterName; + } + + public void setSystemTopicClusterName(String systemTopicClusterName) { + this.systemTopicClusterName = systemTopicClusterName; + } + + public int getHeartbeatSyncerThreadPoolNums() { + return heartbeatSyncerThreadPoolNums; + } + + public void setHeartbeatSyncerThreadPoolNums(int heartbeatSyncerThreadPoolNums) { + this.heartbeatSyncerThreadPoolNums = heartbeatSyncerThreadPoolNums; + } + + public int getHeartbeatSyncerThreadPoolQueueCapacity() { + return heartbeatSyncerThreadPoolQueueCapacity; + } + + public void setHeartbeatSyncerThreadPoolQueueCapacity(int heartbeatSyncerThreadPoolQueueCapacity) { + this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity; + } + public boolean isEnablePrintJstack() { return enablePrintJstack; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index 81a819007..f30519d74 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -78,7 +78,7 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme protected void init(MessagingProcessor messagingProcessor) { this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor); - this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService()); + this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(), this.grpcClientSettingsManager); this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java index 97a0ae6da..fb6df2562 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java @@ -30,12 +30,14 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.remoting.protocol.ResponseCode; public class GrpcChannelManager implements StartAndShutdown { private final ProxyRelayService proxyRelayService; + private final GrpcClientSettingsManager grpcClientSettingsManager; protected final ConcurrentMap<String, GrpcClientChannel> clientIdChannelMap = new ConcurrentHashMap<>(); protected final AtomicLong nonceIdGenerator = new AtomicLong(0); @@ -45,8 +47,9 @@ public class GrpcChannelManager implements StartAndShutdown { new ThreadFactoryImpl("GrpcChannelManager_") ); - public GrpcChannelManager(ProxyRelayService proxyRelayService) { + public GrpcChannelManager(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager) { this.proxyRelayService = proxyRelayService; + this.grpcClientSettingsManager = grpcClientSettingsManager; } protected void init() { @@ -58,7 +61,7 @@ public class GrpcChannelManager implements StartAndShutdown { public GrpcClientChannel createChannel(ProxyContext ctx, String clientId) { return this.clientIdChannelMap.computeIfAbsent(clientId, - k -> new GrpcClientChannel(proxyRelayService, this, ctx, clientId)); + k -> new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, this, ctx, clientId)); } public GrpcClientChannel getChannel(String clientId) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java index ec14473fd..714d0bf01 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java @@ -18,13 +18,17 @@ package org.apache.rocketmq.proxy.grpc.v2.channel; import apache.rocketmq.v2.PrintThreadStackTraceCommand; import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; +import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.TelemetryCommand; import apache.rocketmq.v2.VerifyMessageCommand; import com.google.common.base.MoreObjects; import com.google.common.collect.ComparisonChain; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; +import com.google.protobuf.util.JsonFormat; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import io.netty.channel.Channel; import io.netty.channel.ChannelId; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; @@ -33,7 +37,14 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; +import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter; import org.apache.rocketmq.proxy.service.relay.ProxyChannel; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -45,24 +56,70 @@ import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequest import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; -public class GrpcClientChannel extends ProxyChannel { +public class GrpcClientChannel extends ProxyChannel implements ChannelExtendAttributeGetter, RemoteChannelConverter { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final GrpcChannelManager grpcChannelManager; + private final GrpcClientSettingsManager grpcClientSettingsManager; private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>(); private final Object telemetryWriteLock = new Object(); private final String clientId; - public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcChannelManager grpcChannelManager, - ProxyContext ctx, String clientId) { + public GrpcClientChannel(ProxyRelayService proxyRelayService, GrpcClientSettingsManager grpcClientSettingsManager, + GrpcChannelManager grpcChannelManager, ProxyContext ctx, String clientId) { super(proxyRelayService, null, new GrpcChannelId(clientId), ctx.getRemoteAddress(), ctx.getLocalAddress()); this.grpcChannelManager = grpcChannelManager; + this.grpcClientSettingsManager = grpcClientSettingsManager; this.clientId = clientId; } + @Override + public String getChannelExtendAttribute() { + Settings settings = this.grpcClientSettingsManager.getRawClientSettings(this.clientId); + if (settings == null) { + return null; + } + try { + return JsonFormat.printer().print(settings); + } catch (InvalidProtocolBufferException e) { + log.error("convert settings to json data failed. settings:{}", settings, e); + } + return null; + } + + public static Settings parseChannelExtendAttribute(Channel channel) { + if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.GRPC_V2) && + channel instanceof ChannelExtendAttributeGetter) { + String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute(); + if (attr == null) { + return null; + } + + Settings.Builder builder = Settings.newBuilder(); + try { + JsonFormat.parser().merge(attr, builder); + return builder.build(); + } catch (InvalidProtocolBufferException e) { + log.error("convert settings json data to settings failed. data:{}", attr, e); + return null; + } + } + return null; + } + + @Override + public RemoteChannel toRemoteChannel() { + return new RemoteChannel( + ConfigurationManager.getProxyConfig().getLocalServeAddr(), + this.getRemoteAddress(), + this.getLocalAddress(), + ChannelProtocolType.GRPC_V2, + this.getChannelExtendAttribute()); + } + protected static class GrpcChannelId implements ChannelId { private final String clientId; @@ -181,14 +238,6 @@ public class GrpcClientChannel extends ProxyChannel { return clientId; } - public String getRemoteAddress() { - return remoteAddress; - } - - public String getLocalAddress() { - return localAddress; - } - public void writeTelemetryCommand(TelemetryCommand command) { StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get(); if (observer == null) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 20035f7a1..00cc862a4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -32,6 +32,7 @@ import apache.rocketmq.v2.ThreadStackTrace; import apache.rocketmq.v2.VerifyMessageResult; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import io.netty.channel.Channel; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -39,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerChangeListener; import org.apache.rocketmq.broker.client.ProducerGroupEvent; @@ -49,6 +51,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; @@ -209,7 +212,8 @@ public class ClientActivity extends AbstractMessingActivity { }; } - protected void processTelemetryException(TelemetryCommand request, Throwable t, StreamObserver<TelemetryCommand> responseObserver) { + protected void processTelemetryException(TelemetryCommand request, Throwable t, + StreamObserver<TelemetryCommand> responseObserver) { StatusRuntimeException exception = io.grpc.Status.INTERNAL .withDescription("process client telemetryCommand failed. " + t.getMessage()) .withCause(t) @@ -291,7 +295,8 @@ public class ClientActivity extends AbstractMessingActivity { return channel; } - protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) { + protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, + List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) { String clientId = ctx.getClientID(); LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage()); @@ -413,14 +418,67 @@ public class ClientActivity extends AbstractMessingActivity { @Override public void handle(ConsumerGroupEvent event, String group, Object... args) { - if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) { - if (args == null || args.length < 1) { - return; + switch (event) { + case CLIENT_UNREGISTER: + processClientUnregister(group, args); + break; + case REGISTER: + processClientRegister(group, args); + break; + default: + break; + } + } + + protected void processClientUnregister(String group, Object... args) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + String clientId = clientChannelInfo.getClientId(); + if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> { + if (grpcChannelManager.getChannel(clientId) == null) { + // if there is no channel connect directly to this proxy + return null; + } + return orgSettings; + }); + } else { + grpcChannelManager.removeChannel(clientId); + grpcClientSettingsManager.computeIfPresent(clientId, orgSettings -> { + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(group); + if (consumerGroupInfo == null) { + return null; + } + List<Channel> allChannels = consumerGroupInfo.getAllChannel(); + if (allChannels == null || allChannels.isEmpty() || allChannels.size() == 1) { + // if there is only one channel of this clientId or + // there is no channel if this clientId + // remove settings of this client + return null; + } + return orgSettings; + }); } - if (args[0] instanceof ClientChannelInfo) { - ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - grpcChannelManager.removeChannel(clientChannelInfo.getClientId()); - grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId()); + } + } + + protected void processClientRegister(String group, Object... args) { + if (args == null || args.length < 2) { + return; + } + if (args[1] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[1]; + Channel channel = clientChannelInfo.getChannel(); + if (ChannelHelper.isRemote(channel)) { + // save settings from channel sync from other proxy + Settings settings = GrpcClientChannel.parseChannelExtendAttribute(channel); + if (settings == null) { + return; + } + grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), settings); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index 8c3c3a8a4..21c3395ae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -52,6 +53,10 @@ public class GrpcClientSettingsManager { this.messagingProcessor = messagingProcessor; } + public Settings getRawClientSettings(String clientId) { + return CLIENT_SETTINGS_MAP.get(clientId); + } + public Settings getClientSettings(ProxyContext ctx) { String clientId = ctx.getClientID(); Settings settings = CLIENT_SETTINGS_MAP.get(clientId); @@ -184,6 +189,10 @@ public class GrpcClientSettingsManager { CLIENT_SETTINGS_MAP.remove(clientId); } + public void computeIfPresent(String clientId, Function<Settings, Settings> function) { + CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> function.apply(value)); + } + public Settings removeAndGetClientSettings(ProxyContext ctx) { String clientId = ctx.getClientID(); Settings settings = CLIENT_SETTINGS_MAP.remove(clientId); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java index 5408fa066..8fb6eaf7d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java @@ -101,6 +101,11 @@ public class ClientProcessor extends AbstractProcessor { this.serviceManager.getConsumerManager().unregisterConsumer(consumerGroup, clientChannelInfo, false); } + public void doChannelCloseEvent(String remoteAddr, Channel channel) { + this.serviceManager.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.serviceManager.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + } + public void registerConsumerIdsChangeListener(ConsumerIdsChangeListener listener) { this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 1b7baba0a..66239f0e8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -274,6 +274,11 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen this.clientProcessor.registerConsumerIdsChangeListener(consumerIdsChangeListener); } + @Override + public void doChannelCloseEvent(String remoteAddr, Channel channel) { + this.clientProcessor.doChannelCloseEvent(remoteAddr, channel); + } + @Override public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) { return this.clientProcessor.getConsumerGroupInfo(consumerGroup); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 3e8b8084e..3c4e6303f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -285,6 +285,8 @@ public interface MessagingProcessor extends StartAndShutdown { ConsumerIdsChangeListener consumerIdsChangeListener ); + void doChannelCloseEvent(String remoteAddr, Channel channel); + ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup); void addTransactionSubscription( diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index d435b0c2e..9d000bfe9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; @@ -105,6 +106,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { } if (args[0] instanceof ClientChannelInfo) { ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + // if the channel sync from other proxy is expired, not to clear data of connect to current proxy + return; + } clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java new file mode 100644 index 000000000..3538a9496 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelExtendAttributeGetter.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +public interface ChannelExtendAttributeGetter { + + String getChannelExtendAttribute(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java new file mode 100644 index 000000000..d2eeb8353 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/ChannelProtocolType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +public enum ChannelProtocolType { + UNKNOWN("unknown"), + GRPC_V2("grpc_v2"), + GRPC_V1("grpc_v1"), + REMOTING("remoting"); + + private final String name; + + ChannelProtocolType(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java new file mode 100644 index 000000000..5d9c9afcc --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannel.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; + +public class RemoteChannel extends SimpleChannel implements ChannelExtendAttributeGetter { + protected final ChannelProtocolType type; + protected final String remoteProxyIp; + protected volatile String extendAttribute; + + public RemoteChannel(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type, String extendAttribute) { + super(null, + new RemoteChannelId(remoteProxyIp, remoteAddress, localAddress, type), + remoteAddress, localAddress); + this.type = type; + this.remoteProxyIp = remoteProxyIp; + this.extendAttribute = extendAttribute; + } + + public static class RemoteChannelId implements ChannelId { + + private final String id; + + public RemoteChannelId(String remoteProxyIp, String remoteAddress, String localAddress, ChannelProtocolType type) { + this.id = remoteProxyIp + "@" + remoteAddress + "@" + localAddress + "@" + type; + } + + @Override + public String asShortText() { + return this.id; + } + + @Override + public String asLongText() { + return this.id; + } + + @Override + public int compareTo(ChannelId o) { + return this.id.compareTo(o.asLongText()); + } + } + + @Override + public boolean isWritable() { + return false; + } + + public ChannelProtocolType getType() { + return type; + } + + public String encode() { + return RemoteChannelSerializer.toJson(this); + } + + public static RemoteChannel decode(String data) { + return RemoteChannelSerializer.decodeFromJson(data); + } + + public static RemoteChannel create(Channel channel) { + if (channel instanceof RemoteChannelConverter) { + return ((RemoteChannelConverter) channel).toRemoteChannel(); + } + return null; + } + + public String getRemoteProxyIp() { + return remoteProxyIp; + } + + public void setExtendAttribute(String extendAttribute) { + this.extendAttribute = extendAttribute; + } + + @Override + public String getChannelExtendAttribute() { + return this.extendAttribute; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java new file mode 100644 index 000000000..9f886e85d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelConverter.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +public interface RemoteChannelConverter { + + RemoteChannel toRemoteChannel(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java new file mode 100644 index 000000000..8fd216219 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +public class RemoteChannelSerializer { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final String REMOTE_PROXY_IP_KEY = "remoteProxyIp"; + private static final String REMOTE_ADDRESS_KEY = "remoteAddress"; + private static final String LOCAL_ADDRESS_KEY = "localAddress"; + private static final String TYPE_KEY = "type"; + private static final String EXTEND_ATTRIBUTE_KEY = "extendAttribute"; + + public static String toJson(RemoteChannel remoteChannel) { + Map<String, Object> data = new HashMap<>(); + data.put(REMOTE_PROXY_IP_KEY, remoteChannel.getRemoteProxyIp()); + data.put(REMOTE_ADDRESS_KEY, remoteChannel.getRemoteAddress()); + data.put(LOCAL_ADDRESS_KEY, remoteChannel.getLocalAddress()); + data.put(TYPE_KEY, remoteChannel.getType()); + data.put(EXTEND_ATTRIBUTE_KEY, remoteChannel.getChannelExtendAttribute()); + return JSON.toJSONString(data); + } + + public static RemoteChannel decodeFromJson(String jsonData) { + if (StringUtils.isBlank(jsonData)) { + return null; + } + try { + JSONObject jsonObject = JSON.parseObject(jsonData); + return new RemoteChannel( + jsonObject.getString(REMOTE_PROXY_IP_KEY), + jsonObject.getString(REMOTE_ADDRESS_KEY), + jsonObject.getString(LOCAL_ADDRESS_KEY), + jsonObject.getObject(TYPE_KEY, ChannelProtocolType.class), + jsonObject.getObject(EXTEND_ATTRIBUTE_KEY, String.class) + ); + } catch (Throwable t) { + log.error("decode remote channel data failed. data:{}", jsonData, t); + } + return null; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java new file mode 100644 index 000000000..e213ae855 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import io.netty.channel.Channel; +import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity; +import org.apache.rocketmq.remoting.ChannelEventListener; + +public class ClientHousekeepingService implements ChannelEventListener { + + private final ClientManagerActivity clientManagerActivity; + + public ClientHousekeepingService(ClientManagerActivity clientManagerActivity) { + this.clientManagerActivity = clientManagerActivity; + } + + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { + + } + + @Override + public void onChannelClose(String remoteAddr, Channel channel) { + this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); + } + + @Override + public void onChannelException(String remoteAddr, Channel channel) { + this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); + } + + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); + } + +} + diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java new file mode 100644 index 000000000..58b257641 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { + + private final MessagingProcessor messagingProcessor; + private RemotingServer defaultRemotingServer; + + public RemotingProtocolServer(MessagingProcessor messagingProcessor) { + this.messagingProcessor = messagingProcessor; + } + + protected void init() { + + } + + protected void registerRemotingServer(RemotingServer remotingServer) { + + } + + @Override + public void shutdown() throws Exception { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request, + long timeoutMillis) { + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + try { + this.defaultRemotingServer.invokeAsync(channel, request, timeoutMillis, responseFuture -> { + if (responseFuture.getResponseCommand() == null) { + future.completeExceptionally(new MQClientException("response is null after send request to client", responseFuture.getCause())); + return; + } + future.complete(responseFuture.getResponseCommand()); + }); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java new file mode 100644 index 000000000..5a96c41c9 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProxyOutClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface RemotingProxyOutClient { + + CompletableFuture<RemotingCommand> invokeToClient(Channel channel, RemotingCommand request, long timeoutMillis); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java new file mode 100644 index 000000000..62400e033 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.util.Set; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ProducerChangeListener; +import org.apache.rocketmq.broker.client.ProducerGroupEvent; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class ClientManagerActivity extends AbstractRemotingActivity { + + private final RemotingChannelManager remotingChannelManager; + + public ClientManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor, + RemotingChannelManager manager) { + super(requestPipeline, messagingProcessor); + this.remotingChannelManager = manager; + this.init(); + } + + protected void init() { + this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListenerImpl()); + this.messagingProcessor.registerProducerListener(new ProducerChangeListenerImpl()); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + switch (request.getCode()) { + case RequestCode.HEART_BEAT: + return this.heartBeat(ctx, request, context); + case RequestCode.UNREGISTER_CLIENT: + return this.unregisterClient(ctx, request, context); + case RequestCode.CHECK_CLIENT_CONFIG: + return this.checkClientConfig(ctx, request, context); + default: + break; + } + return null; + } + + protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) { + HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); + String clientId = heartbeatData.getClientID(); + + for (ProducerData data : heartbeatData.getProducerDataSet()) { + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + this.remotingChannelManager.createProducerChannel(ctx.channel(), data.getGroupName(), clientId), + clientId, request.getLanguage(), + request.getVersion()); + messagingProcessor.registerProducer(context, data.getGroupName(), clientChannelInfo); + } + + for (ConsumerData data : heartbeatData.getConsumerDataSet()) { + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + this.remotingChannelManager.createConsumerChannel(ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()), + clientId, request.getLanguage(), + request.getVersion()); + messagingProcessor.registerConsumer(context, data.getGroupName(), clientChannelInfo, data.getConsumeType(), + data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), true); + } + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(""); + return response; + } + + protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); + final UnregisterClientRequestHeader requestHeader = + (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class); + final String producerGroup = requestHeader.getProducerGroup(); + if (producerGroup != null) { + RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel()); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + channel, + requestHeader.getClientID(), + request.getLanguage(), + request.getVersion()); + this.messagingProcessor.unRegisterProducer(context, producerGroup, clientChannelInfo); + } + final String consumerGroup = requestHeader.getConsumerGroup(); + if (consumerGroup != null) { + RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel()); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + channel, + requestHeader.getClientID(), + request.getLanguage(), + request.getVersion()); + this.messagingProcessor.unRegisterConsumer(context, consumerGroup, clientChannelInfo); + } + response.setCode(ResponseCode.SUCCESS); + response.setRemark(""); + return response; + } + + protected RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(""); + return response; + } + + public void doChannelCloseEvent(String remoteAddr, Channel channel) { + Set<RemotingChannel> remotingChannelSet = this.remotingChannelManager.removeChannel(channel); + for (RemotingChannel remotingChannel : remotingChannelSet) { + this.messagingProcessor.doChannelCloseEvent(remoteAddr, remotingChannel); + } + } + + protected class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener { + + @Override + public void handle(ConsumerGroupEvent event, String group, Object... args) { + if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel()); + } + } + } + + @Override + public void shutdown() { + + } + } + + protected class ProducerChangeListenerImpl implements ProducerChangeListener { + + @Override + public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) { + if (event == ProducerGroupEvent.CLIENT_UNREGISTER) { + remotingChannelManager.removeProducerChannel(group, clientChannelInfo.getChannel()); + } + } + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java new file mode 100644 index 000000000..2b2cfca79 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.channel; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannelConverter; +import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; +import org.apache.rocketmq.proxy.remoting.common.RemotingConverter; +import org.apache.rocketmq.proxy.service.relay.ProxyChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.proxy.service.transaction.TransactionData; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class RemotingChannel extends ProxyChannel implements RemoteChannelConverter, ChannelExtendAttributeGetter { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis(); + private final String clientId; + private final String remoteAddress; + private final String localAddress; + private final RemotingProxyOutClient remotingProxyOutClient; + private final Set<SubscriptionData> subscriptionData; + + public RemotingChannel(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService, + Channel parent, + String clientId, Set<SubscriptionData> subscriptionData) { + super(proxyRelayService, parent, parent.id(), + RemotingUtil.socketAddress2String(parent.remoteAddress()), + RemotingUtil.socketAddress2String(parent.localAddress())); + this.remotingProxyOutClient = remotingProxyOutClient; + this.clientId = clientId; + this.remoteAddress = RemotingUtil.socketAddress2String(parent.remoteAddress()); + this.localAddress = RemotingUtil.socketAddress2String(parent.localAddress()); + this.subscriptionData = subscriptionData; + } + + @Override + public boolean isOpen() { + return this.parent().isOpen(); + } + + @Override + public boolean isActive() { + return this.parent().isActive(); + } + + @Override + public boolean isWritable() { + return this.parent().isWritable(); + } + + @Override + protected CompletableFuture<Void> processOtherMessage(Object msg) { + this.parent().writeAndFlush(msg); + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture<Void> processCheckTransaction(CheckTransactionStateRequestHeader header, + MessageExt messageExt, TransactionData transactionData, + CompletableFuture<ProxyRelayResult<Void>> responseFuture) { + CompletableFuture<Void> writeFuture = new CompletableFuture<>(); + try { + CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); + requestHeader.setCommitLogOffset(transactionData.getCommitLogOffset()); + requestHeader.setTranStateTableOffset(transactionData.getTranStateTableOffset()); + requestHeader.setTransactionId(transactionData.getTransactionId()); + requestHeader.setMsgId(header.getMsgId()); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); + request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt)); + + this.parent().writeAndFlush(request).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + responseFuture.complete(null); + writeFuture.complete(null); + } else { + Exception e = new RemotingException("write and flush data failed"); + responseFuture.completeExceptionally(e); + writeFuture.completeExceptionally(e); + } + }); + } catch (Throwable t) { + responseFuture.completeExceptionally(t); + writeFuture.completeExceptionally(t); + } + return writeFuture; + } + + @Override + protected CompletableFuture<Void> processGetConsumerRunningInfo(RemotingCommand command, + GetConsumerRunningInfoRequestHeader header, + CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> responseFuture) { + CompletableFuture<Void> writeFuture = new CompletableFuture<>(); + try { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) + .thenApply(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class); + responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo)); + return null; + } + String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark()); + RuntimeException e = new RuntimeException(errMsg); + responseFuture.completeExceptionally(e); + throw e; + }); + } catch (Throwable t) { + responseFuture.completeExceptionally(t); + writeFuture.completeExceptionally(t); + } + return writeFuture; + } + + @Override + protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command, + ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, + CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) { + CompletableFuture<Void> writeFuture = new CompletableFuture<>(); + try { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, header); + request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt)); + + return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) + .thenApply(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class); + responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result)); + return null; + } + String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark()); + RuntimeException e = new RuntimeException(errMsg); + responseFuture.completeExceptionally(e); + throw e; + }); + } catch (Throwable t) { + responseFuture.completeExceptionally(t); + writeFuture.completeExceptionally(t); + } + return writeFuture; + } + + public String getClientId() { + return clientId; + } + + @Override + public String getChannelExtendAttribute() { + if (this.subscriptionData == null) { + return null; + } + return JSON.toJSONString(this.subscriptionData); + } + + public static Set<SubscriptionData> parseChannelExtendAttribute(Channel channel) { + if (ChannelHelper.getChannelProtocolType(channel).equals(ChannelProtocolType.REMOTING) && + channel instanceof ChannelExtendAttributeGetter) { + String attr = ((ChannelExtendAttributeGetter) channel).getChannelExtendAttribute(); + if (attr == null) { + return null; + } + + try { + return JSON.parseObject(attr, new TypeReference<Set<SubscriptionData>>() { + }); + } catch (Exception e) { + log.error("convert remoting extend attribute to subscriptionDataSet failed. data:{}", attr, e); + return null; + } + } + return null; + } + + @Override + public RemoteChannel toRemoteChannel() { + return new RemoteChannel( + ConfigurationManager.getProxyConfig().getLocalServeAddr(), + this.getRemoteAddress(), + this.getLocalAddress(), + ChannelProtocolType.REMOTING, + this.getChannelExtendAttribute()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java new file mode 100644 index 000000000..bdc6457e7 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.channel; + +import io.netty.channel.Channel; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RemotingChannelManager implements StartAndShutdown { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private final ProxyRelayService proxyRelayService; + protected final ConcurrentMap<String /* group */, Map<Channel /* raw channel */, RemotingChannel>> groupChannelMap = new ConcurrentHashMap<>(); + + private final RemotingProxyOutClient remotingProxyOutClient; + + public RemotingChannelManager(RemotingProxyOutClient remotingProxyOutClient, ProxyRelayService proxyRelayService) { + this.remotingProxyOutClient = remotingProxyOutClient; + this.proxyRelayService = proxyRelayService; + } + + protected String buildProducerKey(String group) { + return buildKey("p", group); + } + + protected String buildConsumerKey(String group) { + return buildKey("c", group); + } + + protected String buildKey(String prefix, String group) { + return prefix + group; + } + + protected String getGroupFromKey(String key) { + return key.substring(1); + } + + public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) { + return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet()); + } + + public RemotingChannel createConsumerChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) { + return createChannel(channel, buildConsumerKey(group), clientId, subscriptionData); + } + + protected RemotingChannel createChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) { + this.groupChannelMap.compute(group, (groupKey, clientIdMap) -> { + if (clientIdMap == null) { + clientIdMap = new ConcurrentHashMap<>(); + } + clientIdMap.computeIfAbsent(channel, clientIdKey -> new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, subscriptionData)); + return clientIdMap; + }); + return getChannel(group, channel); + } + + public RemotingChannel getConsumerChannel(String group, Channel channel) { + return getChannel(buildConsumerKey(group), channel); + } + + public RemotingChannel getProducerChannel(String group, Channel channel) { + return getChannel(buildProducerKey(group), channel); + } + + protected RemotingChannel getChannel(String group, Channel channel) { + Map<Channel, RemotingChannel> clientIdChannelMap = this.groupChannelMap.get(group); + if (clientIdChannelMap == null) { + return null; + } + return clientIdChannelMap.get(channel); + } + + public Set<RemotingChannel> removeChannel(Channel channel) { + Set<RemotingChannel> removedChannelSet = new HashSet<>(); + for (Map.Entry<String, Map<Channel /* raw channel */, RemotingChannel>> entry : groupChannelMap.entrySet()) { + Map<Channel /* raw channel */, RemotingChannel> channelMap = entry.getValue(); + + RemotingChannel remotingChannel = channelMap.remove(channel); + if (remotingChannel != null) { + removedChannelSet.add(remotingChannel); + } + } + return removedChannelSet; + } + + public RemotingChannel removeProducerChannel(String group, Channel channel) { + return removeChannel(buildProducerKey(group), channel); + } + + public RemotingChannel removeConsumerChannel(String group, Channel channel) { + return removeChannel(buildConsumerKey(group), channel); + } + + protected RemotingChannel removeChannel(String group, Channel channel) { + AtomicReference<RemotingChannel> channelRef = new AtomicReference<>(); + + this.groupChannelMap.computeIfPresent(group, (groupKey, channelMap) -> { + channelRef.set(channelMap.remove(channel)); + if (channelMap.isEmpty()) { + return null; + } + return channelMap; + }); + return channelRef.get(); + } + + @Override + public void shutdown() throws Exception { + + } + + @Override + public void start() throws Exception { + + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java new file mode 100644 index 000000000..55af1ff19 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.common; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +public class RemotingConverter { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + + protected static final Object INSTANCE_CREATE_LOCK = new Object(); + protected static volatile RemotingConverter instance; + + public static RemotingConverter getInstance() { + if (instance == null) { + synchronized (INSTANCE_CREATE_LOCK) { + if (instance == null) { + instance = new RemotingConverter(); + } + } + } + return instance; + } + + public byte[] convertMsgToBytes(List<MessageExt> msgList) { + // set response body + byte[][] msgBufferList = new byte[msgList.size()][]; + int bodyTotalSize = 0; + for (int i = 0; i < msgList.size(); i++) { + try { + msgBufferList[i] = convertMsgToBytes(msgList.get(i)); + bodyTotalSize += msgBufferList[i].length; + } catch (Exception e) { + log.error("messageToByteBuffer UnsupportedEncodingException", e); + } + } + + ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); + for (byte[] bb : msgBufferList) { + body.put(bb); + } + + return body.array(); + } + + public byte[] convertMsgToBytes(final MessageExt msg) throws Exception { + // change to 0 for recalculate storeSize + msg.setStoreSize(0); + if (msg.getTopic().length() > Byte.MAX_VALUE) { + log.warn("Topic length is too long, topic: {}", msg.getTopic()); + } + return MessageDecoder.encode(msg, false); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java index ac1ff6a88..24b27aaa2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java @@ -32,6 +32,9 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.admin.DefaultAdminService; +import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager; import org.apache.rocketmq.proxy.service.message.ClusterMessageService; import org.apache.rocketmq.proxy.service.message.MessageService; import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService; @@ -52,11 +55,12 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S protected ClusterTransactionService clusterTransactionService; protected ProducerManager producerManager; - protected ConsumerManager consumerManager; + protected ClusterConsumerManager consumerManager; protected TopicRouteService topicRouteService; protected MessageService messageService; protected ProxyRelayService proxyRelayService; protected ClusterMetadataService metadataService; + protected AdminService adminService; protected ScheduledExecutorService scheduledExecutorService; protected MQClientAPIFactory messagingClientAPIFactory; @@ -67,7 +71,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); this.scheduledExecutorService = Executors.newScheduledThreadPool(3); this.producerManager = new ProducerManager(); - this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout()); + this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout()); this.messagingClientAPIFactory = new MQClientAPIFactory( "ClusterMQClient_", @@ -76,7 +80,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S rpcHook, scheduledExecutorService); this.operationClientAPIFactory = new MQClientAPIFactory( - "TopicRouteServiceClient_", + "OperationClient_", 1, new DoNothingClientRemotingProcessor(null), rpcHook, @@ -95,6 +99,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S this.transactionClientAPIFactory); this.proxyRelayService = new ClusterProxyRelayService(this.clusterTransactionService); this.metadataService = new ClusterMetadataService(topicRouteService, operationClientAPIFactory); + this.adminService = new DefaultAdminService(this.operationClientAPIFactory); this.init(); } @@ -118,6 +123,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S this.appendStartAndShutdown(this.topicRouteService); this.appendStartAndShutdown(this.clusterTransactionService); this.appendStartAndShutdown(this.metadataService); + this.appendStartAndShutdown(this.consumerManager); } @Override @@ -155,6 +161,11 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S return this.metadataService; } + @Override + public AdminService getAdminService() { + return this.adminService; + } + protected static class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener { @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java index 6afc86c57..4f829caa6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java @@ -25,6 +25,8 @@ import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.admin.DefaultAdminService; import org.apache.rocketmq.proxy.service.channel.ChannelManager; import org.apache.rocketmq.proxy.service.message.LocalMessageService; import org.apache.rocketmq.proxy.service.message.MessageService; @@ -48,6 +50,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser private final TransactionService transactionService; private final ProxyRelayService proxyRelayService; private final MetadataService metadataService; + private final AdminService adminService; private final MQClientAPIFactory mqClientAPIFactory; private final ChannelManager channelManager; @@ -60,7 +63,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser this.channelManager = new ChannelManager(); this.messageService = new LocalMessageService(brokerController, channelManager, rpcHook); this.mqClientAPIFactory = new MQClientAPIFactory( - "TopicRouteServiceClient_", + "LocalMQClient_", 1, new DoNothingClientRemotingProcessor(null), rpcHook, @@ -70,6 +73,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser this.transactionService = new LocalTransactionService(brokerController.getBrokerConfig()); this.proxyRelayService = new LocalProxyRelayService(brokerController, this.transactionService); this.metadataService = new LocalMetadataService(brokerController); + this.adminService = new DefaultAdminService(this.mqClientAPIFactory); this.init(); } @@ -114,6 +118,11 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser return this.metadataService; } + @Override + public AdminService getAdminService() { + return this.adminService; + } + private class LocalServiceManagerStartAndShutdown implements StartAndShutdown { @Override public void start() throws Exception { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java index 563b56715..ce84832ca 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java @@ -16,9 +16,10 @@ */ package org.apache.rocketmq.proxy.service; -import org.apache.rocketmq.broker.client.ConsumerManager; -import org.apache.rocketmq.broker.client.ProducerManager; +import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.broker.client.ProducerManagerInterface; import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.message.MessageService; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -30,13 +31,15 @@ public interface ServiceManager extends StartAndShutdown { TopicRouteService getTopicRouteService(); - ProducerManager getProducerManager(); + ProducerManagerInterface getProducerManager(); - ConsumerManager getConsumerManager(); + ConsumerManagerInterface getConsumerManager(); TransactionService getTransactionService(); ProxyRelayService getProxyRelayService(); MetadataService getMetadataService(); + + AdminService getAdminService(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java new file mode 100644 index 000000000..d98d17ff7 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.admin; + +import java.util.List; +import org.apache.rocketmq.common.protocol.route.BrokerData; + +public interface AdminService { + + boolean topicExist(String topic); + + boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum, + int rQueueNum, boolean examineTopic, int retryCheckCount); + + boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList, + List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception; +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java new file mode 100644 index 000000000..4f3b407d6 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.admin; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.service.route.TopicRouteHelper; + +public class DefaultAdminService implements AdminService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private final MQClientAPIFactory mqClientAPIFactory; + + public DefaultAdminService(MQClientAPIFactory mqClientAPIFactory) { + this.mqClientAPIFactory = mqClientAPIFactory; + } + + @Override + public boolean topicExist(String topic) { + boolean topicExist; + TopicRouteData topicRouteData; + try { + topicRouteData = this.getTopicRouteDataDirectlyFromNameServer(topic); + topicExist = topicRouteData != null; + } catch (Throwable e) { + topicExist = false; + } + + return topicExist; + } + + @Override + public boolean createTopicOnTopicBrokerIfNotExist(String createTopic, String sampleTopic, int wQueueNum, + int rQueueNum, boolean examineTopic, int retryCheckCount) { + TopicRouteData curTopicRouteData = new TopicRouteData(); + try { + curTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(createTopic); + } catch (Exception e) { + if (!TopicRouteHelper.isTopicNotExistError(e)) { + log.error("get cur topic route {} failed.", createTopic, e); + return false; + } + } + + TopicRouteData sampleTopicRouteData = null; + try { + sampleTopicRouteData = this.getTopicRouteDataDirectlyFromNameServer(sampleTopic); + } catch (Exception e) { + log.error("create topic {} failed.", createTopic, e); + return false; + } + + if (sampleTopicRouteData == null || sampleTopicRouteData.getBrokerDatas().isEmpty()) { + return false; + } + + try { + return this.createTopicOnBroker(createTopic, wQueueNum, rQueueNum, curTopicRouteData.getBrokerDatas(), + sampleTopicRouteData.getBrokerDatas(), examineTopic, retryCheckCount); + } catch (Exception e) { + log.error("create topic {} failed.", createTopic, e); + } + return false; + } + + @Override + public boolean createTopicOnBroker(String topic, int wQueueNum, int rQueueNum, List<BrokerData> curBrokerDataList, + List<BrokerData> sampleBrokerDataList, boolean examineTopic, int retryCheckCount) throws Exception { + Set<String> curBrokerAddr = new HashSet<>(); + if (curBrokerDataList != null) { + for (BrokerData brokerData : curBrokerDataList) { + curBrokerAddr.add(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID)); + } + } + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setWriteQueueNums(wQueueNum); + topicConfig.setReadQueueNums(rQueueNum); + topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + + for (BrokerData brokerData : sampleBrokerDataList) { + String addr = brokerData.getBrokerAddrs() == null ? null : brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + if (addr == null) { + continue; + } + if (curBrokerAddr.contains(addr)) { + continue; + } + + this.getClient().createTopic(addr, TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, Duration.ofSeconds(3).toMillis()); + } + + if (examineTopic) { + // examine topic exist. + int count = retryCheckCount; + while (count-- > 0) { + if (this.topicExist(topic)) { + return true; + } + } + } else { + return true; + } + return false; + } + + protected TopicRouteData getTopicRouteDataDirectlyFromNameServer(String topic) throws Exception { + return this.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis()); + } + + protected MQClientAPIExt getClient() { + return this.mqClientAPIFactory.getClient(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java index 04ad5e269..65c1fd406 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java @@ -196,6 +196,14 @@ public class SimpleChannel extends AbstractChannel { } + public String getRemoteAddress() { + return remoteAddress; + } + + public String getLocalAddress() { + return localAddress; + } + public ChannelHandlerContext getChannelHandlerContext() { return channelHandlerContext; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java new file mode 100644 index 000000000..3bb65b03e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.client; + +import java.util.Set; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.service.route.TopicRouteService; +import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer; + +public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown { + + protected HeartbeatSyncer heartbeatSyncer; + + public ClusterConsumerManager(TopicRouteService topicRouteService, AdminService adminService, + MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener consumerIdsChangeListener, long channelExpiredTimeout) { + super(consumerIdsChangeListener, channelExpiredTimeout); + this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, this, mqClientAPIFactory); + } + + @Override + public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) { + this.heartbeatSyncer.onConsumerRegister(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList); + return super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, + isNotifyConsumerIdsChangedEnable, updateSubscription); + } + + @Override + public void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo, + boolean isNotifyConsumerIdsChangedEnable) { + this.heartbeatSyncer.onConsumerUnRegister(group, clientChannelInfo); + super.unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable); + } + + @Override + public void shutdown() throws Exception { + this.heartbeatSyncer.shutdown(); + } + + @Override + public void start() throws Exception { + this.heartbeatSyncer.start(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java new file mode 100644 index 000000000..72593525e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.sysmessage; + +import com.alibaba.fastjson.JSON; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; +import org.apache.rocketmq.proxy.service.route.TopicRouteService; +import org.apache.rocketmq.remoting.RPCHook; + +public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently { + protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final TopicRouteService topicRouteService; + protected final AdminService adminService; + protected final String systemResourceName; + protected final MQClientAPIFactory mqClientAPIFactory; + protected DefaultMQPushConsumer defaultMQPushConsumer; + + public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mqClientAPIFactory) { + this.topicRouteService = topicRouteService; + this.adminService = adminService; + this.mqClientAPIFactory = mqClientAPIFactory; + + this.systemResourceName = this.getSystemResourceName(); + } + + protected String getSystemResourceName() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName(); + } + + protected String getSystemMessageProducerId() { + return "PID_" + this.systemResourceName; + } + + protected String getSystemMessageConsumerId() { + return "CID_" + this.systemResourceName; + } + + protected String getBroadcastTopicName() { + return this.systemResourceName; + } + + protected String getSubTag() { + return "*"; + } + + protected String getBroadcastTopicClusterName() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + return proxyConfig.getSystemTopicClusterName(); + } + + protected int getBroadcastTopicQueueNum() { + return 1; + } + + protected RPCHook getRpcHook() { + return null; + } + + protected void sendSystemMessage(Object data) { + String targetTopic = this.getBroadcastTopicName(); + try { + Message message = new Message( + targetTopic, + JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8) + ); + + AddressableMessageQueue messageQueue = this.topicRouteService.getAllMessageQueueView(targetTopic) + .getWriteSelector().selectOne(true); + this.mqClientAPIFactory.getClient().sendMessageAsync( + messageQueue.getBrokerAddr(), + messageQueue.getBrokerName(), + message, + buildSendMessageRequestHeader(message, this.getSystemMessageProducerId(), messageQueue.getQueueId()), + Duration.ofSeconds(3).toMillis() + ).whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + log.error("send system message failed. data: {}, topic: {}", data, getBroadcastTopicName(), throwable); + return; + } + if (SendStatus.SEND_OK != result.getSendStatus()) { + log.error("send system message failed. data: {}, topic: {}, sendResult:{}", data, getBroadcastTopicName(), result); + } + }); + } catch (Throwable t) { + log.error("send system message failed. data: {}, topic: {}", data, targetTopic, t); + } + } + + protected SendMessageRequestHeader buildSendMessageRequestHeader(Message message, + String producerGroup, int queueId) { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + + requestHeader.setProducerGroup(producerGroup); + requestHeader.setTopic(message.getTopic()); + requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopicQueueNums(0); + requestHeader.setQueueId(queueId); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(message.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties())); + requestHeader.setReconsumeTimes(0); + requestHeader.setBatch(false); + return requestHeader; + } + + @Override + public void start() throws Exception { + this.createSysTopic(); + RPCHook rpcHook = this.getRpcHook(); + this.defaultMQPushConsumer = new DefaultMQPushConsumer(null, this.getSystemMessageConsumerId(), rpcHook); + + this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING); + try { + this.defaultMQPushConsumer.subscribe(this.getBroadcastTopicName(), this.getSubTag()); + } catch (MQClientException e) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "subscribe to broadcast topic " + this.getBroadcastTopicName() + " failed. " + e.getMessage()); + } + this.defaultMQPushConsumer.registerMessageListener(this); + this.defaultMQPushConsumer.start(); + } + + protected void createSysTopic() { + if (this.adminService.topicExist(this.getBroadcastTopicName())) { + return; + } + + String clusterName = this.getBroadcastTopicClusterName(); + if (StringUtils.isEmpty(clusterName)) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "system topic cluster cannot be empty"); + } + + boolean createSuccess = this.adminService.createTopicOnTopicBrokerIfNotExist( + this.getBroadcastTopicName(), + clusterName, + this.getBroadcastTopicQueueNum(), + this.getBroadcastTopicQueueNum(), + true, + 3 + ); + if (!createSuccess) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "create system broadcast topic " + this.getBroadcastTopicName() + " failed on cluster " + clusterName); + } + } + + @Override + public void shutdown() throws Exception { + this.defaultMQPushConsumer.shutdown(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java new file mode 100644 index 000000000..ce3403766 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.sysmessage; + +import com.alibaba.fastjson.JSON; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.service.route.TopicRouteService; + +public class HeartbeatSyncer extends AbstractSystemMessageSyncer { + + protected ThreadPoolExecutor threadPoolExecutor; + protected ConsumerManagerInterface consumerManager; + protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>(); + + public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService, + ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) { + super(topicRouteService, adminService, mqClientAPIFactory); + this.consumerManager = consumerManager; + this.init(); + } + + protected void init() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + this.threadPoolExecutor = ThreadPoolMonitor.createAndMonitor( + proxyConfig.getHeartbeatSyncerThreadPoolNums(), + proxyConfig.getHeartbeatSyncerThreadPoolNums(), + 1, + TimeUnit.MINUTES, + "HeartbeatSyncer", + proxyConfig.getHeartbeatSyncerThreadPoolQueueCapacity() + ); + this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { + @Override + public void handle(ConsumerGroupEvent event, String s, Object... args) { + if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) { + if (args == null || args.length < 1) { + return; + } + if (args[0] instanceof ClientChannelInfo) { + ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; + remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText()); + } + } + } + + @Override + public void shutdown() { + + } + }); + } + + @Override + public void shutdown() throws Exception { + this.threadPoolExecutor.shutdown(); + super.shutdown(); + } + + public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + Set<SubscriptionData> subList) { + if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + return; + } + try { + this.threadPoolExecutor.submit(() -> { + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel()); + if (remoteChannel == null) { + return; + } + HeartbeatSyncerData data = new HeartbeatSyncerData( + HeartbeatType.REGISTER, + clientChannelInfo.getClientId(), + clientChannelInfo.getLanguage(), + clientChannelInfo.getVersion(), + consumerGroup, + consumeType, + messageModel, + consumeFromWhere, + proxyConfig.getLocalServeAddr(), + remoteChannel.encode(), + remoteChannel.getChannelExtendAttribute() + ); + data.setSubscriptionDataSet(subList); + + this.sendSystemMessage(data); + } catch (Throwable t) { + log.error("heartbeat register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}", + consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t); + } + }); + } catch (Throwable t) { + log.error("heartbeat submit register broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}, messageModel:{}, consumeFromWhere:{}, subList:{}", + consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, t); + } + } + + public void onConsumerUnRegister(String consumerGroup, ClientChannelInfo clientChannelInfo) { + if (clientChannelInfo == null || ChannelHelper.isRemote(clientChannelInfo.getChannel())) { + return; + } + try { + this.threadPoolExecutor.submit(() -> { + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel()); + if (remoteChannel == null) { + return; + } + HeartbeatSyncerData data = new HeartbeatSyncerData( + HeartbeatType.UNREGISTER, + clientChannelInfo.getClientId(), + clientChannelInfo.getLanguage(), + clientChannelInfo.getVersion(), + consumerGroup, + null, + null, + null, + proxyConfig.getLocalServeAddr(), + remoteChannel.encode(), + remoteChannel.getChannelExtendAttribute() + ); + + this.sendSystemMessage(data); + } catch (Throwable t) { + log.error("heartbeat unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}", + consumerGroup, clientChannelInfo, t); + } + }); + } catch (Throwable t) { + log.error("heartbeat submit unregister broadcast failed. group:{}, clientChannelInfo:{}, consumeType:{}", + consumerGroup, clientChannelInfo, t); + } + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { + if (msgs == null || msgs.isEmpty()) { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + for (MessageExt msg : msgs) { + try { + HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class); + if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) { + continue; + } + + RemoteChannel channel = RemoteChannel.decode(data.getChannelData()); + RemoteChannel finalChannel = channel; + channel = remoteChannelMap.computeIfAbsent(channel.id().asLongText(), key -> finalChannel); + channel.setExtendAttribute(data.getChannelExtendAttribute()); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + channel, + data.getClientId(), + data.getLanguage(), + data.getVersion() + ); + if (data.getHeartbeatType().equals(HeartbeatType.REGISTER)) { + this.consumerManager.registerConsumer( + data.getGroup(), + clientChannelInfo, + data.getConsumeType(), + data.getMessageModel(), + data.getConsumeFromWhere(), + data.getSubscriptionDataSet(), + false + ); + } else { + this.consumerManager.unregisterConsumer( + data.getGroup(), + clientChannelInfo, + false + ); + } + } catch (Throwable t) { + log.error("heartbeat consume message failed. msg:{}, data:{}", msg, new String(msg.getBody(), StandardCharsets.UTF_8), t); + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java new file mode 100644 index 000000000..20fee7aac --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.sysmessage; + +import java.util.Set; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.LanguageCode; + +public class HeartbeatSyncerData { + private HeartbeatType heartbeatType; + private String clientId; + private LanguageCode language; + private int version; + private long lastUpdateTimestamp = System.currentTimeMillis(); + private Set<SubscriptionData> subscriptionDataSet; + private String group; + private ConsumeType consumeType; + private MessageModel messageModel; + private ConsumeFromWhere consumeFromWhere; + private String connectProxyIp; + private String channelData; + private String channelExtendAttribute; + + public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId, + LanguageCode language, int version, String group, + ConsumeType consumeType, MessageModel messageModel, + ConsumeFromWhere consumeFromWhere, String connectProxyIp, + String channelData, String channelExtendAttribute) { + this.heartbeatType = heartbeatType; + this.clientId = clientId; + this.language = language; + this.version = version; + this.group = group; + this.consumeType = consumeType; + this.messageModel = messageModel; + this.consumeFromWhere = consumeFromWhere; + this.connectProxyIp = connectProxyIp; + this.channelData = channelData; + this.channelExtendAttribute = channelExtendAttribute; + } + + public HeartbeatType getHeartbeatType() { + return heartbeatType; + } + + public void setHeartbeatType(HeartbeatType heartbeatType) { + this.heartbeatType = heartbeatType; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public LanguageCode getLanguage() { + return language; + } + + public void setLanguage(LanguageCode language) { + this.language = language; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + public Set<SubscriptionData> getSubscriptionDataSet() { + return subscriptionDataSet; + } + + public void setSubscriptionDataSet( + Set<SubscriptionData> subscriptionDataSet) { + this.subscriptionDataSet = subscriptionDataSet; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public ConsumeType getConsumeType() { + return consumeType; + } + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + public String getConnectProxyIp() { + return connectProxyIp; + } + + public void setConnectProxyIp(String connectProxyIp) { + this.connectProxyIp = connectProxyIp; + } + + public String getChannelData() { + return channelData; + } + + public void setChannelData(String channelData) { + this.channelData = channelData; + } + + public String getChannelExtendAttribute() { + return channelExtendAttribute; + } + + public void setChannelExtendAttribute(String channelExtendAttribute) { + this.channelExtendAttribute = channelExtendAttribute; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java new file mode 100644 index 000000000..8f0801f54 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.sysmessage; + +public enum HeartbeatType { + REGISTER, + UNREGISTER; +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index 3a50d842f..eb90b9205 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -64,6 +65,8 @@ public class ProxyClientRemotingProcessorTest { @Mock private ProducerManager producerManager; @Mock + private GrpcClientSettingsManager grpcClientSettingsManager; + @Mock private ProxyRelayService proxyRelayService; @Test @@ -74,7 +77,7 @@ public class ProxyClientRemotingProcessorTest { new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000), proxyRelayResultFuture)); - GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null, + GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null, ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId"); when(producerManager.getAvailableChannel(anyString())) .thenReturn(grpcClientChannel);
