This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b511e0977 [ISSUE #5095] [Remoting-A] Support logging rpc distribution
in remoting protocol (#5114)
b511e0977 is described below
commit b511e097720bc0e5a05ccd7bac10d488918e3b6d
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 19 17:07:49 2022 +0800
[ISSUE #5095] [Remoting-A] Support logging rpc distribution in remoting
protocol (#5114)
* [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting
protocol
* [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting
protocol
* [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting
protocol
* [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting
protocol
* [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting
protocol
Co-authored-by: 斜阳 <[email protected]>
---
distribution/conf/logback_broker.xml | 26 ++++++
distribution/conf/logback_namesrv.xml | 26 ++++++
remoting/pom.xml | 5 +-
.../rocketmq/remoting/common/RemotingHelper.java | 1 +
.../remoting/netty/NettyRemotingServer.java | 38 +++++++-
.../netty/RemotingCodeDistributionHandler.java | 100 +++++++++++++++++++++
.../netty/RemotingCodeDistributionHandlerTest.java | 80 +++++++++++++++++
7 files changed, 274 insertions(+), 2 deletions(-)
diff --git a/distribution/conf/logback_broker.xml
b/distribution/conf/logback_broker.xml
index 1186d7fb8..3daa0b2f2 100644
--- a/distribution/conf/logback_broker.xml
+++ b/distribution/conf/logback_broker.xml
@@ -119,6 +119,27 @@
<appender-ref ref="RocketmqStoreAppender_inner"/>
</appender>
+ <appender name="RocketmqTrafficAppender_inner"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/broker_traffic.log</file>
+ <append>true</append>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/broker_traffic.%i.log.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+ <appender name="RocketmqTrafficAppender"
class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqTrafficAppender_inner"/>
+ </appender>
+
<appender name="RocketmqRemotingAppender_inner"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/${brokerLogDir}/remoting.log</file>
@@ -359,6 +380,11 @@
<appender-ref ref="RocketmqPopAppender" />
</logger>
+ <logger name="RocketmqTraffic" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="RocketmqTrafficAppender" />
+ </logger>
+
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
diff --git a/distribution/conf/logback_namesrv.xml
b/distribution/conf/logback_namesrv.xml
index b0f5eca55..f8e0c59ac 100644
--- a/distribution/conf/logback_namesrv.xml
+++ b/distribution/conf/logback_namesrv.xml
@@ -59,6 +59,27 @@
<discardingThreshold>0</discardingThreshold>
</appender>
+ <appender name="RocketmqTrafficAppender_inner"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/namesrv_traffic.log</file>
+ <append>true</append>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/namesrv_traffic.%i.log.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+ <appender name="RocketmqTrafficAppender"
class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqTrafficAppender_inner"/>
+ </appender>
+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
@@ -92,6 +113,11 @@
<appender-ref ref="RocketmqNamesrvAppender"/>
</logger>
+ <logger name="RocketmqTraffic" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="RocketmqTrafficAppender" />
+ </logger>
+
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index f567d84ea..84595ac7a 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -44,7 +44,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-logging</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 4c8a62a44..b8b180611 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -34,6 +34,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RemotingHelper {
+ public static final String ROCKETMQ_TRAFFIC = "RocketmqTraffic";
public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
public static final String DEFAULT_CHARSET = "UTF-8";
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index e9d5c0dc2..a80434545 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -49,7 +49,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
@@ -69,12 +72,16 @@ import
org.apache.rocketmq.remoting.protocol.RemotingCommand;
@SuppressWarnings("NullableProblems")
public class NettyRemotingServer extends NettyRemotingAbstract implements
RemotingServer {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final InternalLogger TRAFFIC_LOGGER =
+ InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_TRAFFIC);
+
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
+ private final ScheduledExecutorService scheduledExecutorService;
private final ChannelEventListener channelEventListener;
private final Timer timer = new Timer("ServerHouseKeepingService", true);
@@ -95,6 +102,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;
+ private RemotingCodeDistributionHandler distributionHandler;
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
@@ -108,9 +116,9 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
this.channelEventListener = channelEventListener;
this.publicExecutor = buildPublicExecutor(nettyServerConfig);
+ this.scheduledExecutorService = buildScheduleExecutor();
this.eventLoopGroupBoss = buildBossEventLoopGroup();
-
this.eventLoopGroupSelector = buildEventLoopGroupSelector();
loadSslContext();
@@ -178,6 +186,15 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
});
}
+ private ScheduledExecutorService buildScheduleExecutor() {
+ return new ScheduledThreadPoolExecutor(1,
+ r -> {
+ Thread thread = new Thread(r, "NettyServerScheduler");
+ thread.setDaemon(true);
+ return thread;
+ }, new ThreadPoolExecutor.DiscardOldestPolicy());
+ }
+
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
@@ -230,6 +247,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
+ distributionHandler,
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
@@ -269,6 +287,14 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
}
}, 1000 * 3, 1000);
+
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ try {
+ NettyRemotingServer.this.printRemotingCodeDistribution();
+ } catch (Throwable e) {
+ TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code
distribution exception", e);
+ }
+ }, 1, 1, TimeUnit.SECONDS);
}
private void addCustomConfig(ServerBootstrap childHandler) {
@@ -400,6 +426,16 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
+ distributionHandler = new RemotingCodeDistributionHandler();
+ }
+
+ private void printRemotingCodeDistribution() {
+ if (distributionHandler != null) {
+ TRAFFIC_LOGGER.info("Port: {}, RequestCode Distribution: {}",
+ nettyServerConfig.getListenPort(),
distributionHandler.getInBoundSnapshotString());
+ TRAFFIC_LOGGER.info("Port: {}, ResponseCode Distribution: {}",
+ nettyServerConfig.getListenPort(),
distributionHandler.getOutBoundSnapshotString());
+ }
}
@ChannelHandler.Sharable
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
new file mode 100644
index 000000000..598628b85
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
[email protected]
+public class RemotingCodeDistributionHandler extends ChannelDuplexHandler {
+
+ private final ConcurrentMap<Integer, LongAdder> inboundDistribution;
+ private final ConcurrentMap<Integer, LongAdder> outboundDistribution;
+
+ public RemotingCodeDistributionHandler() {
+ inboundDistribution = new ConcurrentHashMap<>();
+ outboundDistribution = new ConcurrentHashMap<>();
+ }
+
+ private void countInbound(int requestCode) {
+ LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k ->
new LongAdder());
+ item.increment();
+ }
+
+ private void countOutbound(int responseCode) {
+ LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k
-> new LongAdder());
+ item.increment();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof RemotingCommand) {
+ RemotingCommand cmd = (RemotingCommand) msg;
+ countInbound(cmd.getCode());
+ }
+ ctx.fireChannelRead(msg);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
+ if (msg instanceof RemotingCommand) {
+ RemotingCommand cmd = (RemotingCommand) msg;
+ countOutbound(cmd.getCode());
+ }
+ ctx.write(msg, promise);
+ }
+
+ private Map<Integer, Long> getDistributionSnapshot(Map<Integer, LongAdder>
countMap) {
+ Map<Integer, Long> map = new HashMap<>(countMap.size());
+ for (Map.Entry<Integer, LongAdder> entry : countMap.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().sumThenReset());
+ }
+ return map;
+ }
+
+ private String snapshotToString(Map<Integer, Long> distribution) {
+ StringBuilder sb = new StringBuilder("{");
+ if (null != distribution && !distribution.isEmpty()) {
+ boolean first = true;
+ for (Map.Entry<Integer, Long> entry : distribution.entrySet()) {
+ if (0L == entry.getValue()) {
+ continue;
+ }
+ sb.append(first ? "" : ",
").append(entry.getKey()).append(":").append(entry.getValue());
+ first = false;
+ }
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public String getInBoundSnapshotString() {
+ return
this.snapshotToString(this.getDistributionSnapshot(this.inboundDistribution));
+ }
+
+ public String getOutBoundSnapshotString() {
+ return
this.snapshotToString(this.getDistributionSnapshot(this.outboundDistribution));
+ }
+}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
new file mode 100644
index 000000000..ee6f3f6c2
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class RemotingCodeDistributionHandlerTest {
+
+ private final RemotingCodeDistributionHandler distributionHandler = new
RemotingCodeDistributionHandler();
+
+ @Test
+ public void remotingCodeCountTest() throws Exception {
+ Class<RemotingCodeDistributionHandler> clazz =
RemotingCodeDistributionHandler.class;
+ Method methodIn = clazz.getDeclaredMethod("countInbound", int.class);
+ Method methodOut = clazz.getDeclaredMethod("countOutbound", int.class);
+ methodIn.setAccessible(true);
+ methodOut.setAccessible(true);
+
+ int threadCount = 4;
+ int count = 1000 * 1000;
+ CountDownLatch latch = new CountDownLatch(threadCount);
+ AtomicBoolean result = new AtomicBoolean(true);
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadCount, new ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "RemotingCodeTest_" +
this.threadIndex.incrementAndGet());
+ }
+ });
+
+ for (int i = 0; i < threadCount; i++) {
+ executorService.submit(() -> {
+ try {
+ for (int j = 0; j < count; j++) {
+ methodIn.invoke(distributionHandler, 1);
+ methodOut.invoke(distributionHandler, 2);
+ }
+ } catch (Exception e) {
+ result.set(false);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ Assert.assertTrue(result.get());
+
await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(10)).until(()
-> {
+ boolean f1 = ("{1:" + count * threadCount +
"}").equals(distributionHandler.getInBoundSnapshotString());
+ boolean f2 = ("{2:" + count * threadCount +
"}").equals(distributionHandler.getOutBoundSnapshotString());
+ return f1 && f2;
+ });
+ }
+}
\ No newline at end of file