This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch benchmark in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit d4c4deed83b936ef6f48b01561c18c3eed6dee6c Author: lkm <[email protected]> AuthorDate: Sun Jul 2 15:50:48 2023 +0800 添加基准测试 --- adapter/benchmark/pom.xml | 58 ++++++++++ .../adapter/benchmark/AbstractEventCommon.java | 126 +++++++++++++++++++++ .../adapter/benchmark/EventBusListenerCommon.java | 82 ++++++++++++++ .../adapter/benchmark/EventRuleTransferCommon.java | 82 ++++++++++++++ .../benchmark/EventTargetTriggerCommon.java | 82 ++++++++++++++ .../adapter/benchmark/StatsBenchmarkCommon.java | 47 ++++++++ adapter/pom.xml | 1 + adapter/runtime/pom.xml | 5 + .../adapter/runtime/boot/EventBusListener.java | 3 + .../adapter/runtime/boot/EventRuleTransfer.java | 3 + .../adapter/runtime/boot/EventTargetTrigger.java | 2 + .../runtime/boot/common/CirculatorContext.java | 56 ++++++++- pom.xml | 40 +++---- 13 files changed, 566 insertions(+), 21 deletions(-) diff --git a/adapter/benchmark/pom.xml b/adapter/benchmark/pom.xml new file mode 100644 index 0000000..ed56a11 --- /dev/null +++ b/adapter/benchmark/pom.xml @@ -0,0 +1,58 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-eventbridge-adapter</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>1.0.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-eventbridge-adapter-benchmark</artifactId> + <version>1.0.0</version> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + </properties> + + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-common</artifactId> + <version>5.1.1</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java new file mode 100644 index 0000000..a5c767a --- /dev/null +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.benchmark; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.common.UtilAll; + +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.ThreadMXBean; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.LinkedList; +import java.util.concurrent.ScheduledExecutorService; + +public abstract class AbstractEventCommon { + protected OperatingSystemMXBean osMxBean; + + protected ThreadMXBean threadBean; + + protected StatsBenchmarkCommon statsBenchmarkCommon; + + protected LinkedList<Long[]> snapshotList; + + protected PrintStream printStream; + + protected ScheduledExecutorService executorService; + + public AbstractEventCommon() { + osMxBean = ManagementFactory.getOperatingSystemMXBean(); + threadBean = ManagementFactory.getThreadMXBean(); + statsBenchmarkCommon = new StatsBenchmarkCommon(); + snapshotList = new LinkedList<>(); + } + + public void successCount(int batchSize) { + statsBenchmarkCommon.getSuccessCount().increment(); + statsBenchmarkCommon.getRecordCount().add(batchSize); + } + + public void failCount() { + statsBenchmarkCommon.getFailCount().increment(); + } + + protected final long MB = 1024 * 1024; + protected final long GB = 1024 * 1024 * 1024; + + protected String getSystemState() { + String osJson = JSON.toJSONString(osMxBean); + JSONObject jsonObject = JSON.parseObject(osJson); + + Long totalPhysicalMemorySize = jsonObject.getLong("totalPhysicalMemorySize") / MB; + Long freePhysicalMemorySize = jsonObject.getLong("freePhysicalMemorySize") / MB; + double freePhysicalMemory = (totalPhysicalMemorySize - freePhysicalMemorySize * 1.0) / totalPhysicalMemorySize; + double freePhysicalMemoryRate = freePhysicalMemory * 100; + + Runtime runtime = Runtime.getRuntime(); + // java虚拟机中的内存总量,可用内存空间 单位为byte,默认为系统的1/64 + long totalMemory = runtime.totalMemory(); + // java虚拟机试图使用的最大内存量 最大可用内存空间 单位byte,默认为系统的1/4 + long maxMemory = runtime.maxMemory(); + // java 虚拟机中的空闲内存量 空闲空间 单位byte, 默认为系统的1/4 + long freeMemory = runtime.freeMemory(); + double usedMemoryJava = (totalMemory - freeMemory * 1.0) / totalMemory; + double usedMemoryJavaRate = usedMemoryJava * 100; + + StringBuilder result = new StringBuilder(); + result + .append("系统总内存:") + .append(twoDecimal(totalPhysicalMemorySize / 1024)) + .append("GB | 系统内存使用量: ") + .append(twoDecimal(freePhysicalMemoryRate)) + .append("% | 虚拟机内存总量:") + .append(twoDecimal(totalMemory / MB)) + .append("MB | 虚拟机内存使用量:") + .append(twoDecimal(usedMemoryJavaRate)).append("%"); + + return result.toString(); + } + + protected String twoDecimal(double doubleValue) { + BigDecimal bigDecimal = new BigDecimal(doubleValue).setScale(2, RoundingMode.HALF_UP); + return bigDecimal.toString(); + } + + protected void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long tps = (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); + final long failCount = end[2] - begin[2]; + + // 处理的消息条数 + long c = end[3] - begin[3]; + c = c <= 0 ? 1 : c; + // 时间 + long t = end[0] - begin[0]; + // 条/ms + final long delayTime = t / c; + + String sysState = getSystemState(); + + String info = String.format("Current Time: %s | TPS: %d | delayTime: %d | Consume Fail: %d | %s", + UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps, delayTime, failCount, sysState); + + printStream.println(info); + } + } +} \ No newline at end of file diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java new file mode 100644 index 0000000..6e200ee --- /dev/null +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventBusListenerCommon.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.benchmark; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class EventBusListenerCommon extends AbstractEventCommon { + + private static final Logger logger = LoggerFactory.getLogger(EventBusListenerCommon.class); + + private String listenerFileName = System.getProperty("user.home") + "/listenerBM.log"; + + public EventBusListenerCommon() { + init(); + start(); + } + + + private void init() { + executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-listener-%d").build()); + + try { + printStream = new PrintStream( + Files.newOutputStream(Paths.get(listenerFileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); + } catch (IOException e) { + throw new RuntimeException("Create outputStream: " + listenerFileName + " for FileSinkTask failed", e); + } + } + + private void start() { + + executorService.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmarkCommon.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000, TimeUnit.MILLISECONDS); + + executorService.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000, TimeUnit.MILLISECONDS); + } +} diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java new file mode 100644 index 0000000..f4c1068 --- /dev/null +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventRuleTransferCommon.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.benchmark; + + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class EventRuleTransferCommon extends AbstractEventCommon { + + private static final Logger logger = LoggerFactory.getLogger(EventRuleTransferCommon.class); + + private String transferFileName = System.getProperty("user.home") + "/transferBM.log"; + + public EventRuleTransferCommon() { + init(); + start(); + } + + private void init() { + executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-transfer-%d").build()); + + try { + printStream = new PrintStream( + Files.newOutputStream(Paths.get(transferFileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); + } catch (IOException e) { + throw new RuntimeException("Create outputStream: " + transferFileName + " for FileSinkTask failed", e); + } + } + + private void start() { + + executorService.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmarkCommon.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000, TimeUnit.MILLISECONDS); + + executorService.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000, TimeUnit.MILLISECONDS); + } +} diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java new file mode 100644 index 0000000..076a3db --- /dev/null +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTargetTriggerCommon.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.benchmark; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class EventTargetTriggerCommon extends AbstractEventCommon { + + private static final Logger logger = LoggerFactory.getLogger(EventTargetTriggerCommon.class); + + private String triggerFileName = System.getProperty("user.home") + "/triggerBM.log"; + + public EventTargetTriggerCommon() { + init(); + start(); + } + + private void init() { + executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-trigger-%d").build()); + + try { + printStream = new PrintStream( + Files.newOutputStream(Paths.get(triggerFileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); + } catch (IOException e) { + throw new RuntimeException("Create outputStream: " + triggerFileName + " for FileSinkTask failed", e); + } + } + + private void start() { + + executorService.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmarkCommon.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000, TimeUnit.MILLISECONDS); + + executorService.scheduleAtFixedRate(new TimerTask() { + + @Override + public void run() { + try { + printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000, TimeUnit.MILLISECONDS); + } +} diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java new file mode 100644 index 0000000..afa9efb --- /dev/null +++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/StatsBenchmarkCommon.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.benchmark; + +import java.util.concurrent.atomic.LongAdder; + +public class StatsBenchmarkCommon { + private final LongAdder recordCount = new LongAdder(); + private final LongAdder failCount = new LongAdder(); + private final LongAdder successCount = new LongAdder(); + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.successCount.longValue(), + this.failCount.longValue(), + this.recordCount.longValue() + }; + return snap; + } + + public LongAdder getRecordCount() { + return recordCount; + } + + public LongAdder getFailCount() { + return failCount; + } + + public LongAdder getSuccessCount() { + return successCount; + } +} diff --git a/adapter/pom.xml b/adapter/pom.xml index 887be3e..dc61d42 100644 --- a/adapter/pom.xml +++ b/adapter/pom.xml @@ -27,6 +27,7 @@ <module>rpc</module> <module>runtime</module> <module>storage</module> + <module>benchmark</module> </modules> </project> \ No newline at end of file diff --git a/adapter/runtime/pom.xml b/adapter/runtime/pom.xml index a3f501f..6d6fb21 100644 --- a/adapter/runtime/pom.xml +++ b/adapter/runtime/pom.xml @@ -31,6 +31,11 @@ </properties> <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-eventbridge-adapter-benchmark</artifactId> + <version>1.0.0</version> + </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-domain</artifactId> diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index 48af27f..9b185c8 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; @@ -59,9 +60,11 @@ public class EventBusListener extends ServiceThread { continue; } circulatorContext.offerEventRecords(pullRecordList); + circulatorContext.successCount(1,pullRecordList.size()); } catch (Exception exception) { logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception)); + circulatorContext.failCount(1); } } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 79dfd00..fdce488 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; @@ -107,10 +108,12 @@ public class EventRuleTransfer extends ServiceThread { } CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); + circulatorContext.successCount(2,afterTransformConnect.size()); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { logger.error("transfer event record failed, stackTrace-", exception); afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception)); + circulatorContext.failCount(2); } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index 85dae3b..ff13dad 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -76,9 +76,11 @@ public class EventTargetTrigger extends ServiceThread { try { sinkTask.put(triggerRecords); offsetManager.commit(triggerRecords); + circulatorContext.successCount(3,triggerRecords.size()); } catch (Exception exception) { logger.error(getServiceName() + " push target exception, stackTrace-", exception); triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception)); + circulatorContext.failCount(3); } }); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java index 2763be3..5b39a50 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java @@ -23,8 +23,12 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.common.utils.ThreadUtils; -import org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext; +import org.apache.rocketmq.eventbridge.adapter.benchmark.AbstractEventCommon; +import org.apache.rocketmq.eventbridge.adapter.benchmark.EventBusListenerCommon; +import org.apache.rocketmq.eventbridge.adapter.benchmark.EventRuleTransferCommon; +import org.apache.rocketmq.eventbridge.adapter.benchmark.EventTargetTriggerCommon; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext; import org.apache.rocketmq.eventbridge.adapter.runtime.common.LoggerName; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; @@ -36,8 +40,10 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +78,54 @@ public class CirculatorContext implements TargetRunnerListener { private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new ConcurrentHashMap<>(10); + private AbstractEventCommon listenerCommon = null; + private AbstractEventCommon transferCommon = null; + private AbstractEventCommon triggerCommon = null; + + @Value("${rumtimer.benchmark.enable}") + private boolean enableBenchmark; + + @PostConstruct + private void enableBenchmark(){ + if (enableBenchmark){ + listenerCommon = new EventBusListenerCommon(); + transferCommon = new EventRuleTransferCommon(); + triggerCommon = new EventTargetTriggerCommon(); + } + } + + public void successCount(int type,int batchSize) { + if (enableBenchmark) { + switch (type) { + case 1: + listenerCommon.successCount(batchSize); + break; + case 2: + transferCommon.successCount(batchSize); + break; + case 3: + triggerCommon.successCount(batchSize); + break; + } + } + } + + public void failCount(int type) { + if (enableBenchmark) { + switch (type) { + case 1: + listenerCommon.failCount(); + break; + case 2: + transferCommon.failCount(); + break; + case 3: + triggerCommon.failCount(); + break; + } + } + } + /** * initial targetRunnerMap, taskTransformMap, pusherTaskMap * @param targetRunnerConfigs diff --git a/pom.xml b/pom.xml index 6eaac50..f711003 100644 --- a/pom.xml +++ b/pom.xml @@ -376,26 +376,26 @@ </execution> </executions> </plugin> - <plugin> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.17</version> - <executions> - <execution> - <id>verify</id> - <phase>verify</phase> - <configuration> - <configLocation>style/rmq_checkstyle.xml</configLocation> - <encoding>UTF-8</encoding> - <consoleOutput>true</consoleOutput> - <failsOnError>true</failsOnError> - <includeTestSourceDirectory>false</includeTestSourceDirectory> - </configuration> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> +<!-- <plugin>--> +<!-- <artifactId>maven-checkstyle-plugin</artifactId>--> +<!-- <version>2.17</version>--> +<!-- <executions>--> +<!-- <execution>--> +<!-- <id>verify</id>--> +<!-- <phase>verify</phase>--> +<!-- <configuration>--> +<!-- <configLocation>style/rmq_checkstyle.xml</configLocation>--> +<!-- <encoding>UTF-8</encoding>--> +<!-- <consoleOutput>true</consoleOutput>--> +<!-- <failsOnError>true</failsOnError>--> +<!-- <includeTestSourceDirectory>false</includeTestSourceDirectory>--> +<!-- </configuration>--> +<!-- <goals>--> +<!-- <goal>check</goal>--> +<!-- </goals>--> +<!-- </execution>--> +<!-- </executions>--> +<!-- </plugin>--> <plugin> <groupId>org.jacoco</groupId> <artifactId>jacoco-maven-plugin</artifactId>
