This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 96bab46bd4d3cf3aaa9b2c07acec15907821048b
Author: kaiyi.lk <[email protected]>
AuthorDate: Thu Nov 17 14:20:28 2022 +0800

    [ISSUE #5485] polish channel management
---
 .../rocketmq/proxy/common/utils/FutureUtils.java   |  6 ++++
 .../proxy/remoting/channel/RemotingChannel.java    | 23 ++++++---------
 .../remoting/channel/RemotingChannelManager.java   | 33 +++++++++++-----------
 .../proxy/remoting/common/RemotingConverter.java   | 23 ---------------
 .../proxy/service/admin/DefaultAdminService.java   |  6 +++-
 5 files changed, 36 insertions(+), 55 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java
index 2e194a8cb..ea50e64ee 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/FutureUtils.java
@@ -37,4 +37,10 @@ public class FutureUtils {
     public static <T> CompletableFuture<T> addExecutor(CompletableFuture<T> 
future, ExecutorService executor) {
         return appendNextFuture(future, new CompletableFuture<>(), executor);
     }
+
+    public static <T> CompletableFuture<T> completeExceptionally(Throwable t) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        future.completeExceptionally(t);
+        return future;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 8b4832cad..806b35de2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -38,6 +38,7 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import 
org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
 import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
@@ -132,54 +133,48 @@ public class RemotingChannel extends ProxyChannel 
implements RemoteChannelConver
     protected CompletableFuture<Void> 
processGetConsumerRunningInfo(RemotingCommand command,
         GetConsumerRunningInfoRequestHeader header,
         CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> 
responseFuture) {
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         try {
             RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, 
header);
-            return this.remotingProxyOutClient.invokeToClient(this.parent(), 
request, DEFAULT_MQ_CLIENT_TIMEOUT)
-                .thenApply(response -> {
+            this.remotingProxyOutClient.invokeToClient(this.parent(), request, 
DEFAULT_MQ_CLIENT_TIMEOUT)
+                .thenAccept(response -> {
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         ConsumerRunningInfo consumerRunningInfo = 
ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
                         responseFuture.complete(new 
ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
-                        return null;
                     }
                     String errMsg = String.format("get consumer running info 
failed, code:%s remark:%s", response.getCode(), response.getRemark());
                     RuntimeException e = new RuntimeException(errMsg);
                     responseFuture.completeExceptionally(e);
-                    throw e;
                 });
+            return CompletableFuture.completedFuture(null);
         } catch (Throwable t) {
             responseFuture.completeExceptionally(t);
-            writeFuture.completeExceptionally(t);
+            return FutureUtils.completeExceptionally(t);
         }
-        return writeFuture;
     }
 
     @Override
     protected CompletableFuture<Void> 
processConsumeMessageDirectly(RemotingCommand command,
         ConsumeMessageDirectlyResultRequestHeader header, MessageExt 
messageExt,
         CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> 
responseFuture) {
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         try {
             RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
header);
             
request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
 
-            return this.remotingProxyOutClient.invokeToClient(this.parent(), 
request, DEFAULT_MQ_CLIENT_TIMEOUT)
-                .thenApply(response -> {
+            this.remotingProxyOutClient.invokeToClient(this.parent(), request, 
DEFAULT_MQ_CLIENT_TIMEOUT)
+                .thenAccept(response -> {
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         ConsumeMessageDirectlyResult result = 
ConsumeMessageDirectlyResult.decode(response.getBody(), 
ConsumeMessageDirectlyResult.class);
                         responseFuture.complete(new 
ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
-                        return null;
                     }
                     String errMsg = String.format("consume message directly 
failed, code:%s remark:%s", response.getCode(), response.getRemark());
                     RuntimeException e = new RuntimeException(errMsg);
                     responseFuture.completeExceptionally(e);
-                    throw e;
                 });
+            return CompletableFuture.completedFuture(null);
         } catch (Throwable t) {
             responseFuture.completeExceptionally(t);
-            writeFuture.completeExceptionally(t);
+            return FutureUtils.completeExceptionally(t);
         }
-        return writeFuture;
     }
 
     public String getClientId() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
index bdc6457e7..d47884b61 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -57,10 +57,6 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         return prefix + group;
     }
 
-    protected String getGroupFromKey(String key) {
-        return key.substring(1);
-    }
-
     public RemotingChannel createProducerChannel(Channel channel, String 
group, String clientId) {
         return createChannel(channel, buildProducerKey(group), clientId, 
Collections.emptySet());
     }
@@ -80,14 +76,6 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         return getChannel(group, channel);
     }
 
-    public RemotingChannel getConsumerChannel(String group, Channel channel) {
-        return getChannel(buildConsumerKey(group), channel);
-    }
-
-    public RemotingChannel getProducerChannel(String group, Channel channel) {
-        return getChannel(buildProducerKey(group), channel);
-    }
-
     protected RemotingChannel getChannel(String group, Channel channel) {
         Map<Channel, RemotingChannel> clientIdChannelMap = 
this.groupChannelMap.get(group);
         if (clientIdChannelMap == null) {
@@ -98,10 +86,9 @@ public class RemotingChannelManager implements 
StartAndShutdown {
 
     public Set<RemotingChannel> removeChannel(Channel channel) {
         Set<RemotingChannel> removedChannelSet = new HashSet<>();
-        for (Map.Entry<String, Map<Channel /* raw channel */, 
RemotingChannel>> entry : groupChannelMap.entrySet()) {
-            Map<Channel /* raw channel */, RemotingChannel> channelMap = 
entry.getValue();
-
-            RemotingChannel remotingChannel = channelMap.remove(channel);
+        Set<String> groupKeySet = groupChannelMap.keySet();
+        for (String group : groupKeySet) {
+            RemotingChannel remotingChannel = removeChannel(group, channel);
             if (remotingChannel != null) {
                 removedChannelSet.add(remotingChannel);
             }
@@ -121,7 +108,7 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         AtomicReference<RemotingChannel> channelRef = new AtomicReference<>();
 
         this.groupChannelMap.computeIfPresent(group, (groupKey, channelMap) -> 
{
-            channelRef.set(channelMap.remove(channel));
+            channelRef.set(channelMap.remove(getOrgRawChannel(channel)));
             if (channelMap.isEmpty()) {
                 return null;
             }
@@ -130,6 +117,18 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         return channelRef.get();
     }
 
+    /**
+     * to get the org channel pass by nettyRemotingServer
+     * @param channel
+     * @return
+     */
+    protected Channel getOrgRawChannel(Channel channel) {
+        if (channel instanceof RemotingChannel) {
+            return channel.parent();
+        }
+        return channel;
+    }
+
     @Override
     public void shutdown() throws Exception {
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
index 55af1ff19..a08abbba2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.proxy.remoting.common;
 
-import java.nio.ByteBuffer;
-import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -42,27 +40,6 @@ public class RemotingConverter {
         return instance;
     }
 
-    public byte[] convertMsgToBytes(List<MessageExt> msgList) {
-        // set response body
-        byte[][] msgBufferList = new byte[msgList.size()][];
-        int bodyTotalSize = 0;
-        for (int i = 0; i < msgList.size(); i++) {
-            try {
-                msgBufferList[i] = convertMsgToBytes(msgList.get(i));
-                bodyTotalSize += msgBufferList[i].length;
-            } catch (Exception e) {
-                log.error("messageToByteBuffer UnsupportedEncodingException", 
e);
-            }
-        }
-
-        ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
-        for (byte[] bb : msgBufferList) {
-            body.put(bb);
-        }
-
-        return body.array();
-    }
-
     public byte[] convertMsgToBytes(final MessageExt msg) throws Exception {
         // change to 0 for recalculate storeSize
         msg.setStoreSize(0);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
index 4f3b407d6..e94c0879b 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
@@ -115,7 +115,11 @@ public class DefaultAdminService implements AdminService {
                 continue;
             }
 
-            this.getClient().createTopic(addr, 
TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, 
Duration.ofSeconds(3).toMillis());
+            try {
+                this.getClient().createTopic(addr, 
TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topicConfig, 
Duration.ofSeconds(3).toMillis());
+            } catch (Exception e) {
+                log.error("create topic on broker failed. topic:{}, 
broker:{}", topicConfig, addr, e);
+            }
         }
 
         if (examineTopic) {

Reply via email to