This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 416427de1ec70e00a8bcd881ebd0571906a10d49 Author: zhouxiang <[email protected]> AuthorDate: Wed Nov 23 09:49:32 2022 +0800 [ISSUE #5392] Adapt for logging and module refector --- .../rocketmq/broker/client/ConsumerManager.java | 2 +- .../broker/client/ConsumerManagerInterface.java | 6 ++-- .../rocketmq/broker/client/ProducerManager.java | 2 +- .../broker/client/ProducerManagerInterface.java | 2 +- .../apache/rocketmq/proxy/config/ProxyConfig.java | 3 +- .../processor/channel/RemoteChannelSerializer.java | 6 ++-- .../remoting/MultiProtocolRemotingServer.java | 4 +-- .../proxy/remoting/MultiProtocolTlsHelper.java | 4 +-- .../proxy/remoting/RemotingProtocolServer.java | 8 +++--- .../activity/AbstractRemotingActivity.java | 14 +++++----- .../remoting/activity/ClientManagerActivity.java | 14 +++++----- .../remoting/activity/ConsumerManagerActivity.java | 14 +++++----- .../remoting/activity/GetTopicRouteActivity.java | 6 ++-- .../remoting/activity/PopMessageActivity.java | 2 +- .../remoting/activity/PullMessageActivity.java | 6 ++-- .../remoting/activity/SendMessageActivity.java | 6 ++-- .../remoting/activity/TransactionActivity.java | 4 +-- .../proxy/remoting/channel/RemotingChannel.java | 32 +++++++++++----------- .../remoting/channel/RemotingChannelManager.java | 6 ++-- .../proxy/remoting/common/RemotingConverter.java | 6 ++-- .../http2proxy/Http2ProtocolProxyHandler.java | 6 ++-- .../http2proxy/Http2ProxyBackendHandler.java | 6 ++-- .../http2proxy/Http2ProxyFrontendHandler.java | 6 ++-- .../rocketmq/proxy/service/admin/AdminService.java | 2 +- .../proxy/service/admin/DefaultAdminService.java | 10 +++---- .../service/client/ClusterConsumerManager.java | 6 ++-- .../sysmessage/AbstractSystemMessageSyncer.java | 10 +++---- .../proxy/service/sysmessage/HeartbeatSyncer.java | 6 ++-- .../service/sysmessage/HeartbeatSyncerData.java | 6 ++-- .../grpc/v2/channel/GrpcClientChannelTest.java | 4 +-- .../activity/AbstractRemotingActivityTest.java | 8 +++--- .../remoting/activity/PullMessageActivityTest.java | 12 ++++---- .../remoting/activity/SendMessageActivityTest.java | 10 +++---- .../remoting/channel/RemotingChannelTest.java | 14 +++++----- .../service/admin/DefaultAdminServiceTest.java | 6 ++-- .../service/sysmessage/HeartbeatSyncerTest.java | 18 ++++++------ 36 files changed, 139 insertions(+), 138 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 0582ce75e..a70e8579e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class ConsumerManager { +public class ConsumerManager implements ConsumerManagerInterface { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<>(1024); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java index 895a2e491..6998f60e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java @@ -20,9 +20,9 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; import java.util.Set; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public interface ConsumerManagerInterface { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 047aa8be9..a3ed9c590 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.protocol.body.ProducerInfo; import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class ProducerManager { +public class ProducerManager implements ProducerManagerInterface { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java index 3f2ece7cd..5e2e7e5b0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.broker.client; import io.netty.channel.Channel; -import org.apache.rocketmq.common.protocol.body.ProducerTableInfo; +import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; public interface ProducerManagerInterface { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index bd7cf1113..e0f971202 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.ProxyMode; @@ -241,7 +242,7 @@ public class ProxyConfig implements ConfigFile { public void initData() { parseDelayLevel(); if (StringUtils.isEmpty(localServeAddr)) { - this.localServeAddr = RemotingUtil.getLocalAddress(); + this.localServeAddr = NetworkUtil.getLocalAddress(); } if (StringUtils.isBlank(localServeAddr)) { throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "get local serve ip failed"); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java index 8fd216219..a22401a5f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java @@ -23,11 +23,11 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class RemoteChannelSerializer { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private static final String REMOTE_PROXY_IP_KEY = "remoteProxyIp"; private static final String REMOTE_ADDRESS_KEY = "remoteAddress"; private static final String LOCAL_ADDRESS_KEY = "localAddress"; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java index 02e3a545e..74fb3616c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -23,6 +23,8 @@ import io.netty.handler.timeout.IdleStateHandler; import java.io.IOException; import java.security.cert.CertificateException; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.remoting.protocol.ProtocolNegotiationHandler; @@ -33,8 +35,6 @@ import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * support remoting and http2 protocol at one port diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java index 54af7bc9e..59342ca3c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java @@ -34,9 +34,9 @@ import java.nio.file.Paths; import java.security.cert.CertificateException; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.netty.TlsHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient; import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index adbc21169..a7cc7af47 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -32,10 +32,10 @@ import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; @@ -59,8 +59,8 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java index 650c38614..a66ee6e04 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -25,8 +25,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; @@ -35,11 +36,10 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; -import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; public abstract class AbstractRemotingActivity implements NettyRequestProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -119,8 +119,8 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor context.setAction("Remoting" + request.getCode()) .setLanguage(request.getLanguage().name()) .setChannel(ctx.channel()) - .setLocalAddress(RemotingUtil.socketAddress2String(ctx.channel().localAddress())) - .setRemoteAddress(RemotingUtil.socketAddress2String(ctx.channel().remoteAddress())); + .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) + .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); return context; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java index 10f7fa324..1009e4204 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java @@ -25,13 +25,13 @@ import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerChangeListener; import org.apache.rocketmq.broker.client.ProducerGroupEvent; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; -import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; -import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; -import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index 734b1dad1..1c1993ff0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -24,13 +24,13 @@ import java.util.List; import java.util.Set; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; -import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; -import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java index 670d9735c..9972c26c9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java @@ -23,9 +23,9 @@ import io.netty.channel.ChannelHandlerContext; import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java index d52b84b12..a635e55cc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.ChannelHandlerContext; import java.time.Duration; -import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java index eb744676a..d548ddc0d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java @@ -20,14 +20,14 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.ChannelHandlerContext; import java.time.Duration; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class PullMessageActivity extends AbstractRemotingActivity { public PullMessageActivity(RequestPipeline requestPipeline, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java index 20fab6e57..618d45874 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -22,9 +22,9 @@ import java.time.Duration; import java.util.Map; import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.processor.MessagingProcessor; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java index 24f98a875..bc5e0ca35 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.ChannelHandlerContext; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.processor.MessagingProcessor; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java index 806b35de2..368330115 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java @@ -27,16 +27,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.common.utils.FutureUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -50,12 +43,19 @@ import org.apache.rocketmq.proxy.service.relay.ProxyChannel; import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.transaction.TransactionData; -import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class RemotingChannel extends ProxyChannel implements RemoteChannelConverter, ChannelExtendAttributeGetter { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis(); private final String clientId; private final String remoteAddress; @@ -67,12 +67,12 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver Channel parent, String clientId, Set<SubscriptionData> subscriptionData) { super(proxyRelayService, parent, parent.id(), - RemotingUtil.socketAddress2String(parent.remoteAddress()), - RemotingUtil.socketAddress2String(parent.localAddress())); + NetworkUtil.socketAddress2String(parent.remoteAddress()), + NetworkUtil.socketAddress2String(parent.localAddress())); this.remotingProxyOutClient = remotingProxyOutClient; this.clientId = clientId; - this.remoteAddress = RemotingUtil.socketAddress2String(parent.remoteAddress()); - this.localAddress = RemotingUtil.socketAddress2String(parent.localAddress()); + this.remoteAddress = NetworkUtil.socketAddress2String(parent.remoteAddress()); + this.localAddress = NetworkUtil.socketAddress2String(parent.localAddress()); this.subscriptionData = subscriptionData; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java index d47884b61..6913fc670 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java @@ -26,12 +26,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class RemotingChannelManager implements StartAndShutdown { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java index a08abbba2..2bd53d8de 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java @@ -20,11 +20,11 @@ package org.apache.rocketmq.proxy.remoting.common; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class RemotingConverter { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected static final Object INSTANCE_CREATE_LOCK = new Object(); protected static volatile RemotingConverter instance; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java index 86f1ee921..2ba2d3463 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java @@ -31,15 +31,15 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import javax.net.ssl.SSLException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Http2ProtocolProxyHandler implements ProtocolHandler { - private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final String LOCAL_HOST = "127.0.0.1"; /** * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java index 53bddfc31..dfcd144af 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java @@ -22,12 +22,12 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter { - private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private final Channel inboundChannel; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java index 8bffdc6d0..775c047b8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java @@ -23,12 +23,12 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter { - private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. private final Channel outboundChannel; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java index d98d17ff7..a9e6686b4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.proxy.service.admin; import java.util.List; -import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; public interface AdminService { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java index e94c0879b..4dbf21a98 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java @@ -25,17 +25,17 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.TopicRouteHelper; public class DefaultAdminService implements AdminService { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final MQClientAPIFactory mqClientAPIFactory; public DefaultAdminService(MQClientAPIFactory mqClientAPIFactory) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java index 3bb65b03e..3a98b5ee1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java @@ -23,9 +23,9 @@ import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.ConsumerManagerInterface; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java index 72593525e..e0a9fd702 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java @@ -29,11 +29,9 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.StartAndShutdown; @@ -44,9 +42,11 @@ import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently { - protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final TopicRouteService topicRouteService; protected final AdminService adminService; protected final String systemResourceName; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java index 12504a2f0..041cbcee6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -33,9 +33,9 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.config.ConfigurationManager; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java index f3b96ac9a..10c6f1206 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java @@ -20,9 +20,9 @@ package org.apache.rocketmq.proxy.service.sysmessage; import com.google.common.base.MoreObjects; import java.util.Set; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.LanguageCode; public class HeartbeatSyncerData { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java index 70e10bc2b..1bdbdd9be 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java @@ -22,7 +22,7 @@ import apache.rocketmq.v2.Resource; import apache.rocketmq.v2.Settings; import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; @@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class GrpcClientChannelTest extends InitConfigAndLoggerTest { +public class GrpcClientChannelTest extends InitConfigTest { @Mock private ProxyRelayService proxyRelayService; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java index 74eb3cbd8..ecdc1deaf 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java @@ -24,12 +24,12 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; @@ -53,7 +53,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest { +public class AbstractRemotingActivityTest extends InitConfigTest { AbstractRemotingActivity remotingActivity; @Mock MessagingProcessor messagingProcessorMock; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java index ffbe2ffac..5798e883b 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java @@ -22,16 +22,16 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.sysflag.PullSysFlag; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,7 +49,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class PullMessageActivityTest extends InitConfigAndLoggerTest { +public class PullMessageActivityTest extends InitConfigTest { PullMessageActivity pullMessageActivity; @Mock diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java index e03bc26e0..b88f6677e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java @@ -24,10 +24,10 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; @@ -49,7 +49,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class SendMessageActivityTest extends InitConfigAndLoggerTest { +public class SendMessageActivityTest extends InitConfigTest { SendMessageActivity sendMessageActivity; @Mock diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java index 840f3e40f..d947fa5d5 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java @@ -21,15 +21,15 @@ import io.netty.channel.Channel; import java.util.HashSet; import java.util.Set; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.rocketmq.common.filter.FilterAPI; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; -import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,7 +42,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class RemotingChannelTest extends InitConfigAndLoggerTest { +public class RemotingChannelTest extends InitConfigTest { @Mock private RemotingProxyOutClient remotingProxyOutClient; @Mock @@ -61,8 +61,8 @@ public class RemotingChannelTest extends InitConfigAndLoggerTest { public void before() throws Throwable { super.before(); this.clientId = RandomStringUtils.randomAlphabetic(10); - when(parent.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress(remoteAddress)); - when(parent.localAddress()).thenReturn(RemotingUtil.string2SocketAddress(localAddress)); + when(parent.remoteAddress()).thenReturn(NetworkUtil.string2SocketAddress(remoteAddress)); + when(parent.localAddress()).thenReturn(NetworkUtil.string2SocketAddress(localAddress)); this.subscriptionData = new HashSet<>(); this.subscriptionData.add(FilterAPI.buildSubscriptionData("topic", "subTag")); this.remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java index 039efd8b4..f0e618d11 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java @@ -22,9 +22,9 @@ import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.junit.Before; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 8ac74f533..45e3942d6 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -38,18 +38,11 @@ import org.apache.rocketmq.broker.client.ConsumerManagerInterface; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; @@ -63,6 +56,13 @@ import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.route.MessageQueueView; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.assertj.core.util.Lists; import org.jetbrains.annotations.NotNull; import org.junit.Before; @@ -88,7 +88,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class HeartbeatSyncerTest extends InitConfigAndLoggerTest { +public class HeartbeatSyncerTest extends InitConfigTest { @Mock private TopicRouteService topicRouteService; @Mock
