This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b713aebfa71cc29ca195aca4664625f4c51ebe9c Author: zhouxiang <[email protected]> AuthorDate: Tue Dec 6 14:24:34 2022 +0800 [ISSUE #5485] Fix by code review --- .../apache/rocketmq/common/attribute/TopicMessageType.java | 3 +-- .../proxy/service/message/ClusterMessageService.java | 13 +++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 5e6629e3b..77629e4c9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -53,9 +53,8 @@ public enum TopicMessageType { return TopicMessageType.DELAY; } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) { return TopicMessageType.FIFO; - } else { - return TopicMessageType.NORMAL; } + return TopicMessageType.NORMAL; } public String getMetricsValue() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index c2a5a6435..872b16f51 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.utils.FutureUtils; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.TopicRouteService; @@ -212,10 +213,8 @@ public class ClusterMessageService implements MessageService { try { String brokerAddress = topicRouteService.getBrokerAddr(brokerName); return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis); - } catch (Exception e) { - CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; + } catch (Throwable t) { + return FutureUtils.completeExceptionally(t); } } @@ -225,10 +224,8 @@ public class ClusterMessageService implements MessageService { try { String brokerAddress = topicRouteService.getBrokerAddr(brokerName); return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis); - } catch (Exception e) { - CompletableFuture<Void> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; + } catch (Throwable t) { + return FutureUtils.completeExceptionally(t); } }
