This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 96bab46bd4d3cf3aaa9b2c07acec15907821048b Author: kaiyi.lk <[email protected]> AuthorDate: Thu Nov 17 14:20:28 2022 +0800 [ISSUE #5485] polish channel management --- .../rocketmq/proxy/common/utils/FutureUtils.java | 6 ++++ .../proxy/remoting/channel/RemotingChannel.java | 23 ++++++--------- .../remoting/channel/RemotingChannelManager.java | 33 +++++++++++----------- .../proxy/remoting/common/RemotingConverter.java | 23 --------------- .../proxy/service/admin/DefaultAdminService.java | 6 +++- 5 files changed, 36 insertions(+), 55 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java index 2e194a8cb..ea50e64ee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java @@ -37,4 +37,10 @@ public class FutureUtils { public static <T> CompletableFuture<T> addExecutor(CompletableFuture<T> future, ExecutorService executor) { return appendNextFuture(future, new CompletableFuture<>(), executor); } + + public static <T> CompletableFuture<T> completeExceptionally(Throwable t) { + CompletableFuture<T> future = new CompletableFuture<>(); + future.completeExceptionally(t); + return future; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java index 8b4832cad..806b35de2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.common.utils.FutureUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; @@ -132,54 +133,48 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver protected CompletableFuture<Void> processGetConsumerRunningInfo(RemotingCommand command, GetConsumerRunningInfoRequestHeader header, CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> responseFuture) { - CompletableFuture<Void> writeFuture = new CompletableFuture<>(); try { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); - return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) - .thenApply(response -> { + this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) + .thenAccept(response -> { if (response.getCode() == ResponseCode.SUCCESS) { ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class); responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo)); - return null; } String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark()); RuntimeException e = new RuntimeException(errMsg); responseFuture.completeExceptionally(e); - throw e; }); + return CompletableFuture.completedFuture(null); } catch (Throwable t) { responseFuture.completeExceptionally(t); - writeFuture.completeExceptionally(t); + return FutureUtils.completeExceptionally(t); } - return writeFuture; } @Override protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command, ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) { - CompletableFuture<Void> writeFuture = new CompletableFuture<>(); try { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, header); request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt)); - return this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) - .thenApply(response -> { + this.remotingProxyOutClient.invokeToClient(this.parent(), request, DEFAULT_MQ_CLIENT_TIMEOUT) + .thenAccept(response -> { if (response.getCode() == ResponseCode.SUCCESS) { ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class); responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result)); - return null; } String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark()); RuntimeException e = new RuntimeException(errMsg); responseFuture.completeExceptionally(e); - throw e; }); + return CompletableFuture.completedFuture(null); } catch (Throwable t) { responseFuture.completeExceptionally(t); - writeFuture.completeExceptionally(t); + return FutureUtils.completeExceptionally(t); } - return writeFuture; } public String getClientId() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java index bdc6457e7..d47884b61 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java @@ -57,10 +57,6 @@ public class RemotingChannelManager implements StartAndShutdown { return prefix + group; } - protected String getGroupFromKey(String key) { - return key.substring(1); - } - public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) { return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet()); } @@ -80,14 +76,6 @@ public class RemotingChannelManager implements StartAndShutdown { return getChannel(group, channel); } - public RemotingChannel getConsumerChannel(String group, Channel channel) { - return getChannel(buildConsumerKey(group), channel); - } - - public RemotingChannel getProducerChannel(String group, Channel channel) { - return getChannel(buildProducerKey(group), channel); - } - protected RemotingChannel getChannel(String group, Channel channel) { Map<Channel, RemotingChannel> clientIdChannelMap = this.groupChannelMap.get(group); if (clientIdChannelMap == null) { @@ -98,10 +86,9 @@ public class RemotingChannelManager implements StartAndShutdown { public Set<RemotingChannel> removeChannel(Channel channel) { Set<RemotingChannel> removedChannelSet = new HashSet<>(); - for (Map.Entry<String, Map<Channel /* raw channel */, RemotingChannel>> entry : groupChannelMap.entrySet()) { - Map<Channel /* raw channel */, RemotingChannel> channelMap = entry.getValue(); - - RemotingChannel remotingChannel = channelMap.remove(channel); + Set<String> groupKeySet = groupChannelMap.keySet(); + for (String group : groupKeySet) { + RemotingChannel remotingChannel = removeChannel(group, channel); if (remotingChannel != null) { removedChannelSet.add(remotingChannel); } @@ -121,7 +108,7 @@ public class RemotingChannelManager implements StartAndShutdown { AtomicReference<RemotingChannel> channelRef = new AtomicReference<>(); this.groupChannelMap.computeIfPresent(group, (groupKey, channelMap) -> { - channelRef.set(channelMap.remove(channel)); + channelRef.set(channelMap.remove(getOrgRawChannel(channel))); if (channelMap.isEmpty()) { return null; } @@ -130,6 +117,18 @@ public class RemotingChannelManager implements StartAndShutdown { return channelRef.get(); } + /** + * to get the org channel pass by nettyRemotingServer + * @param channel + * @return + */ + protected Channel getOrgRawChannel(Channel channel) { + if (channel instanceof RemotingChannel) { + return channel.parent(); + } + return channel; + } + @Override public void shutdown() throws Exception { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java index 55af1ff19..a08abbba2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java @@ -17,8 +17,6 @@ package org.apache.rocketmq.proxy.remoting.common; -import java.nio.ByteBuffer; -import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -42,27 +40,6 @@ public class RemotingConverter { return instance; } - public byte[] convertMsgToBytes(List<MessageExt> msgList) { - // set response body - byte[][] msgBufferList = new byte[msgList.size()][]; - int bodyTotalSize = 0; - for (int i = 0; i < msgList.size(); i++) { - try { - msgBufferList[i] = convertMsgToBytes(msgList.get(i)); - bodyTotalSize += msgBufferList[i].length; - } catch (Exception e) { - log.error("messageToByteBuffer UnsupportedEncodingException", e); - } - } - - ByteBuffer body = ByteBuffer.allocate(bodyTotalSize); - for (byte[] bb : msgBufferList) { - body.put(bb); - } - - return body.array(); - } - public byte[] convertMsgToBytes(final MessageExt msg) throws Exception { // change to 0 for recalculate storeSize msg.setStoreSize(0); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java index 4f3b407d6..e94c0879b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java @@ -115,7 +115,11 @@ public class DefaultAdminService implements AdminService { continue; } - this.getClient().createTopic(addr, TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, Duration.ofSeconds(3).toMillis()); + try { + this.getClient().createTopic(addr, TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, Duration.ofSeconds(3).toMillis()); + } catch (Exception e) { + log.error("create topic on broker failed. topic:{}, broker:{}", topicConfig, addr, e); + } } if (examineTopic) {
