This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch benchmark
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit d4c4deed83b936ef6f48b01561c18c3eed6dee6c
Author: lkm <[email protected]>
AuthorDate: Sun Jul 2 15:50:48 2023 +0800

    添加基准测试
---
 adapter/benchmark/pom.xml                          |  58 ++++++++++
 .../adapter/benchmark/AbstractEventCommon.java     | 126 +++++++++++++++++++++
 .../adapter/benchmark/EventBusListenerCommon.java  |  82 ++++++++++++++
 .../adapter/benchmark/EventRuleTransferCommon.java |  82 ++++++++++++++
 .../benchmark/EventTargetTriggerCommon.java        |  82 ++++++++++++++
 .../adapter/benchmark/StatsBenchmarkCommon.java    |  47 ++++++++
 adapter/pom.xml                                    |   1 +
 adapter/runtime/pom.xml                            |   5 +
 .../adapter/runtime/boot/EventBusListener.java     |   3 +
 .../adapter/runtime/boot/EventRuleTransfer.java    |   3 +
 .../adapter/runtime/boot/EventTargetTrigger.java   |   2 +
 .../runtime/boot/common/CirculatorContext.java     |  56 ++++++++-
 pom.xml                                            |  40 +++----
 13 files changed, 566 insertions(+), 21 deletions(-)

diff --git a/adapter/benchmark/pom.xml b/adapter/benchmark/pom.xml
new file mode 100644
index 0000000..ed56a11
--- /dev/null
+++ b/adapter/benchmark/pom.xml
@@ -0,0 +1,58 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>rocketmq-eventbridge-adapter</artifactId>
+    <groupId>org.apache.rocketmq</groupId>
+    <version>1.0.0</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>rocketmq-eventbridge-adapter-benchmark</artifactId>
+  <version>1.0.0</version>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>8</maven.compiler.source>
+    <maven.compiler.target>8</maven.compiler.target>
+  </properties>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.rocketmq</groupId>
+      <artifactId>rocketmq-common</artifactId>
+      <version>5.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
new file mode 100644
index 0000000..a5c767a
--- /dev/null
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.common.UtilAll;
+
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.ThreadMXBean;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+
+public abstract class AbstractEventCommon {
+    protected OperatingSystemMXBean osMxBean;
+
+    protected ThreadMXBean threadBean;
+
+    protected StatsBenchmarkCommon statsBenchmarkCommon;
+
+    protected LinkedList<Long[]> snapshotList;
+
+    protected PrintStream printStream;
+
+    protected ScheduledExecutorService executorService;
+
+    public AbstractEventCommon() {
+        osMxBean = ManagementFactory.getOperatingSystemMXBean();
+        threadBean = ManagementFactory.getThreadMXBean();
+        statsBenchmarkCommon = new StatsBenchmarkCommon();
+        snapshotList = new LinkedList<>();
+    }
+
+    public void successCount(int batchSize) {
+        statsBenchmarkCommon.getSuccessCount().increment();
+        statsBenchmarkCommon.getRecordCount().add(batchSize);
+    }
+
+    public void failCount() {
+        statsBenchmarkCommon.getFailCount().increment();
+    }
+
+    protected final long MB = 1024 * 1024;
+    protected final long GB = 1024 * 1024 * 1024;
+
+    protected String getSystemState() {
+        String osJson = JSON.toJSONString(osMxBean);
+        JSONObject jsonObject = JSON.parseObject(osJson);
+
+        Long totalPhysicalMemorySize = 
jsonObject.getLong("totalPhysicalMemorySize") / MB;
+        Long freePhysicalMemorySize = 
jsonObject.getLong("freePhysicalMemorySize") / MB;
+        double freePhysicalMemory = (totalPhysicalMemorySize - 
freePhysicalMemorySize * 1.0) / totalPhysicalMemorySize;
+        double freePhysicalMemoryRate = freePhysicalMemory * 100;
+
+        Runtime runtime = Runtime.getRuntime();
+        // java虚拟机中的内存总量,可用内存空间 单位为byte,默认为系统的1/64
+        long totalMemory = runtime.totalMemory();
+        // java虚拟机试图使用的最大内存量 最大可用内存空间 单位byte,默认为系统的1/4
+        long maxMemory = runtime.maxMemory();
+        // java 虚拟机中的空闲内存量 空闲空间 单位byte, 默认为系统的1/4
+        long freeMemory = runtime.freeMemory();
+        double usedMemoryJava = (totalMemory - freeMemory * 1.0) / totalMemory;
+        double usedMemoryJavaRate = usedMemoryJava * 100;
+
+        StringBuilder result = new StringBuilder();
+        result
+                .append("系统总内存:")
+                .append(twoDecimal(totalPhysicalMemorySize / 1024))
+                .append("GB  |  系统内存使用量: ")
+                .append(twoDecimal(freePhysicalMemoryRate))
+                .append("%  |  虚拟机内存总量:")
+                .append(twoDecimal(totalMemory / MB))
+                .append("MB  |  虚拟机内存使用量:")
+                .append(twoDecimal(usedMemoryJavaRate)).append("%");
+
+        return result.toString();
+    }
+
+    protected String twoDecimal(double doubleValue) {
+        BigDecimal bigDecimal = new BigDecimal(doubleValue).setScale(2, 
RoundingMode.HALF_UP);
+        return bigDecimal.toString();
+    }
+
+    protected void printStats() {
+        if (snapshotList.size() >= 10) {
+            Long[] begin = snapshotList.getFirst();
+            Long[] end = snapshotList.getLast();
+
+            final long tps = (long) (((end[1] - begin[1]) / (double) (end[0] - 
begin[0])) * 1000L);
+            final long failCount = end[2] - begin[2];
+
+            // 处理的消息条数
+            long c = end[3] - begin[3];
+            c = c <= 0 ? 1 : c;
+            // 时间
+            long t = end[0] - begin[0];
+            // 条/ms
+            final long delayTime = t / c;
+
+            String sysState = getSystemState();
+
+            String info = String.format("Current Time: %s  |  TPS: %d     |  
delayTime: %d     |  Consume Fail: %d     |  %s",
+                    
UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps, delayTime, 
failCount, sysState);
+
+            printStream.println(info);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java
new file mode 100644
index 0000000..6e200ee
--- /dev/null
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class EventBusListenerCommon extends AbstractEventCommon {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EventBusListenerCommon.class);
+
+    private String listenerFileName = System.getProperty("user.home") + 
"/listenerBM.log";
+
+    public EventBusListenerCommon() {
+        init();
+        start();
+    }
+
+
+    private void init() {
+        executorService = new ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-listener-%d").build());
+
+        try {
+            printStream = new PrintStream(
+                    Files.newOutputStream(Paths.get(listenerFileName), 
StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+                    false,
+                    StandardCharsets.UTF_8.name());
+        } catch (IOException e) {
+            throw new RuntimeException("Create outputStream: " + 
listenerFileName + " for FileSinkTask failed", e);
+        }
+    }
+
+    private void start() {
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmarkCommon.createSnapshot());
+                if (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java
new file mode 100644
index 0000000..f4c1068
--- /dev/null
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class EventRuleTransferCommon extends AbstractEventCommon {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EventRuleTransferCommon.class);
+
+    private String transferFileName = System.getProperty("user.home") + 
"/transferBM.log";
+
+    public EventRuleTransferCommon() {
+        init();
+        start();
+    }
+
+    private void init() {
+        executorService = new ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-transfer-%d").build());
+
+        try {
+            printStream = new PrintStream(
+                    Files.newOutputStream(Paths.get(transferFileName), 
StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+                    false,
+                    StandardCharsets.UTF_8.name());
+        } catch (IOException e) {
+            throw new RuntimeException("Create outputStream: " + 
transferFileName + " for FileSinkTask failed", e);
+        }
+    }
+
+    private void start() {
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmarkCommon.createSnapshot());
+                if (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java
new file mode 100644
index 0000000..076a3db
--- /dev/null
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class EventTargetTriggerCommon extends AbstractEventCommon {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EventTargetTriggerCommon.class);
+
+    private String triggerFileName = System.getProperty("user.home") + 
"/triggerBM.log";
+
+    public EventTargetTriggerCommon() {
+        init();
+        start();
+    }
+
+    private void init() {
+        executorService = new ScheduledThreadPoolExecutor(1,
+                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-trigger-%d").build());
+
+        try {
+            printStream = new PrintStream(
+                    Files.newOutputStream(Paths.get(triggerFileName), 
StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+                    false,
+                    StandardCharsets.UTF_8.name());
+        } catch (IOException e) {
+            throw new RuntimeException("Create outputStream: " + 
triggerFileName + " for FileSinkTask failed", e);
+        }
+    }
+
+    private void start() {
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmarkCommon.createSnapshot());
+                if (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+        executorService.scheduleAtFixedRate(new TimerTask() {
+
+            @Override
+            public void run() {
+                try {
+                    printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000, TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java
new file mode 100644
index 0000000..afa9efb
--- /dev/null
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import java.util.concurrent.atomic.LongAdder;
+
+public class StatsBenchmarkCommon {
+    private final LongAdder recordCount = new LongAdder();
+    private final LongAdder failCount = new LongAdder();
+    private final LongAdder successCount = new LongAdder();
+
+    public Long[] createSnapshot() {
+        Long[] snap = new Long[]{
+                System.currentTimeMillis(),
+                this.successCount.longValue(),
+                this.failCount.longValue(),
+                this.recordCount.longValue()
+        };
+        return snap;
+    }
+
+    public LongAdder getRecordCount() {
+        return recordCount;
+    }
+
+    public LongAdder getFailCount() {
+        return failCount;
+    }
+
+    public LongAdder getSuccessCount() {
+        return successCount;
+    }
+}
diff --git a/adapter/pom.xml b/adapter/pom.xml
index 887be3e..dc61d42 100644
--- a/adapter/pom.xml
+++ b/adapter/pom.xml
@@ -27,6 +27,7 @@
         <module>rpc</module>
         <module>runtime</module>
         <module>storage</module>
+        <module>benchmark</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git a/adapter/runtime/pom.xml b/adapter/runtime/pom.xml
index a3f501f..6d6fb21 100644
--- a/adapter/runtime/pom.xml
+++ b/adapter/runtime/pom.xml
@@ -31,6 +31,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-benchmark</artifactId>
+            <version>1.0.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-eventbridge-domain</artifactId>
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f..9b185c8 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import java.util.List;
 import org.apache.commons.collections.CollectionUtils;
+import 
org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
@@ -59,9 +60,11 @@ public class EventBusListener extends ServiceThread {
                     continue;
                 }
                 circulatorContext.offerEventRecords(pullRecordList);
+                circulatorContext.successCount(1,pullRecordList.size());
             } catch (Exception exception) {
                 logger.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
                 pullRecordList.forEach(pullRecord -> 
errorHandler.handle(pullRecord, exception));
+                circulatorContext.failCount(1);
             }
         }
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index 79dfd00..fdce488 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.PostConstruct;
 import org.apache.commons.collections.MapUtils;
+import 
org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
@@ -107,10 +108,12 @@ public class EventRuleTransfer extends ServiceThread {
                 }
                 CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[eventRecordMap.values().size()])).get();
                 circulatorContext.offerTargetTaskQueue(afterTransformConnect);
+                circulatorContext.successCount(2,afterTransformConnect.size());
                 logger.info("offer target task queues succeed, transforms - 
{}", JSON.toJSONString(afterTransformConnect));
             } catch (Exception exception) {
                 logger.error("transfer event record failed, stackTrace-", 
exception);
                 afterTransformConnect.forEach(transferRecord -> 
errorHandler.handle(transferRecord, exception));
+                circulatorContext.failCount(2);
             }
 
         }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b..ff13dad 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -76,9 +76,11 @@ public class EventTargetTrigger extends ServiceThread {
                     try {
                         sinkTask.put(triggerRecords);
                         offsetManager.commit(triggerRecords);
+                        
circulatorContext.successCount(3,triggerRecords.size());
                     } catch (Exception exception) {
                         logger.error(getServiceName() + " push target 
exception, stackTrace-", exception);
                         triggerRecords.forEach(triggerRecord -> 
errorHandler.handle(triggerRecord, exception));
+                        circulatorContext.failCount(3);
                     }
                 });
             }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 2763be3..5b39a50 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -23,8 +23,12 @@ import 
io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
-import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
+import org.apache.rocketmq.eventbridge.adapter.benchmark.AbstractEventCommon;
+import 
org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon;
+import 
org.apache.rocketmq.eventbridge.adapter.benchmark.EventRuleTransferCommon;
+import 
org.apache.rocketmq.eventbridge.adapter.benchmark.EventTargetTriggerCommon;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.LoggerName;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetKeyValue;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
@@ -36,8 +40,10 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +78,54 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new 
ConcurrentHashMap<>(10);
 
+    private  AbstractEventCommon listenerCommon = null;
+    private  AbstractEventCommon transferCommon = null;
+    private  AbstractEventCommon triggerCommon = null;
+
+    @Value("${rumtimer.benchmark.enable}")
+    private boolean enableBenchmark;
+
+    @PostConstruct
+    private void enableBenchmark(){
+        if (enableBenchmark){
+            listenerCommon = new EventBusListenerCommon();
+            transferCommon = new EventRuleTransferCommon();
+            triggerCommon = new EventTargetTriggerCommon();
+        }
+    }
+
+    public void successCount(int type,int batchSize) {
+        if (enableBenchmark) {
+            switch (type) {
+                case 1:
+                     listenerCommon.successCount(batchSize);
+                     break;
+                case 2:
+                     transferCommon.successCount(batchSize);
+                     break;
+                case 3:
+                     triggerCommon.successCount(batchSize);
+                     break;
+            }
+        }
+    }
+
+    public void failCount(int type) {
+        if (enableBenchmark) {
+            switch (type) {
+                case 1:
+                     listenerCommon.failCount();
+                     break;
+                case 2:
+                     transferCommon.failCount();
+                     break;
+                case 3:
+                     triggerCommon.failCount();
+                     break;
+            }
+        }
+    }
+
     /**
      * initial targetRunnerMap, taskTransformMap, pusherTaskMap
      * @param targetRunnerConfigs
diff --git a/pom.xml b/pom.xml
index 6eaac50..f711003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -376,26 +376,26 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <version>2.17</version>
-                <executions>
-                    <execution>
-                        <id>verify</id>
-                        <phase>verify</phase>
-                        <configuration>
-                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
-                            <encoding>UTF-8</encoding>
-                            <consoleOutput>true</consoleOutput>
-                            <failsOnError>true</failsOnError>
-                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
-                        </configuration>
-                        <goals>
-                            <goal>check</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
+<!--            <plugin>-->
+<!--                <artifactId>maven-checkstyle-plugin</artifactId>-->
+<!--                <version>2.17</version>-->
+<!--                <executions>-->
+<!--                    <execution>-->
+<!--                        <id>verify</id>-->
+<!--                        <phase>verify</phase>-->
+<!--                        <configuration>-->
+<!--                            
<configLocation>style/rmq_checkstyle.xml</configLocation>-->
+<!--                            <encoding>UTF-8</encoding>-->
+<!--                            <consoleOutput>true</consoleOutput>-->
+<!--                            <failsOnError>true</failsOnError>-->
+<!--                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>-->
+<!--                        </configuration>-->
+<!--                        <goals>-->
+<!--                            <goal>check</goal>-->
+<!--                        </goals>-->
+<!--                    </execution>-->
+<!--                </executions>-->
+<!--            </plugin>-->
             <plugin>
                 <groupId>org.jacoco</groupId>
                 <artifactId>jacoco-maven-plugin</artifactId>

Reply via email to