[RocketMQ-58] Add integration test for RocketMQ, also thanks @fenglianghfl for this commit, closes apache/incubator-rocketmq#46
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/788771a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/788771a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/788771a8 Branch: refs/heads/master Commit: 788771a851512f1e588ebeb6952e61fe8d892804 Parents: 581039b Author: dongeforever <[email protected]> Authored: Fri Jan 20 11:36:31 2017 +0800 Committer: yukon <[email protected]> Committed: Fri Jan 20 11:36:31 2017 +0800 ---------------------------------------------------------------------- .travis.yml | 1 + BUILDING | 2 +- pom.xml | 73 +++-- test/pom.xml | 52 ++++ .../test/client/mq/MQAsyncProducer.java | 85 ++++++ .../test/client/rmq/RMQAsyncSendProducer.java | 226 ++++++++++++++ .../test/client/rmq/RMQBroadCastConsumer.java | 37 +++ .../test/client/rmq/RMQNormalConsumer.java | 90 ++++++ .../test/client/rmq/RMQNormalProducer.java | 167 ++++++++++ .../clientinterface/AbstractMQConsumer.java | 112 +++++++ .../clientinterface/AbstractMQProducer.java | 151 +++++++++ .../test/clientinterface/MQCollector.java | 108 +++++++ .../test/clientinterface/MQConsumer.java | 26 ++ .../test/clientinterface/MQProducer.java | 30 ++ .../rocketmq/test/factory/ConsumerFactory.java | 45 +++ .../rocketmq/test/factory/MQMessageFactory.java | 128 ++++++++ .../rocketmq/test/factory/MessageFactory.java | 62 ++++ .../rocketmq/test/factory/ProducerFactory.java | 37 +++ .../test/factory/SendCallBackFactory.java | 35 +++ .../rocketmq/test/factory/TagMessage.java | 108 +++++++ .../test/listener/AbstractListener.java | 104 +++++++ .../rmq/concurrent/RMQDelayListner.java | 61 ++++ .../rmq/concurrent/RMQNormalListner.java | 70 +++++ .../listener/rmq/order/RMQOrderListener.java | 85 ++++++ .../rocketmq/test/message/MessageQueueMsg.java | 62 ++++ .../rocketmq/test/sendresult/SendResult.java | 62 ++++ .../apache/rocketmq/test/util/Condition.java | 23 ++ .../test/util/DuplicateMessageInfo.java | 137 +++++++++ .../org/apache/rocketmq/test/util/FileUtil.java | 108 +++++++ .../org/apache/rocketmq/test/util/MQAdmin.java | 164 ++++++++++ .../rocketmq/test/util/MQRandomUtils.java | 28 ++ .../org/apache/rocketmq/test/util/MQWait.java | 93 ++++++ .../apache/rocketmq/test/util/RandomUtil.java | 306 +++++++++++++++++++ .../apache/rocketmq/test/util/RandomUtils.java | 91 ++++++ .../org/apache/rocketmq/test/util/TestUtil.java | 122 ++++++++ .../apache/rocketmq/test/util/TestUtils.java | 50 +++ .../apache/rocketmq/test/util/VerifyUtils.java | 146 +++++++++ .../test/util/data/collect/DataCollector.java | 45 +++ .../util/data/collect/DataCollectorManager.java | 112 +++++++ .../test/util/data/collect/DataFilter.java | 22 ++ .../collect/impl/ListDataCollectorImpl.java | 95 ++++++ .../data/collect/impl/MapDataCollectorImpl.java | 111 +++++++ .../test/util/parallel/ParallelTask.java | 43 +++ .../util/parallel/ParallelTaskExecutor.java | 67 ++++ .../rocketmq/test/util/parallel/Task4Test.java | 30 ++ .../org/apache/rocketmq/test/base/BaseConf.java | 162 ++++++++++ .../rocketmq/test/base/IntegrationTestBase.java | 143 +++++++++ .../balance/NormalMsgDynamicBalanceIT.java | 111 +++++++ .../balance/NormalMsgStaticBalanceIT.java | 109 +++++++ .../consumer/broadcast/BaseBroadCastIT.java | 56 ++++ .../normal/BroadCastNormalMsgNotRecvIT.java | 73 +++++ .../normal/BroadCastNormalMsgRecvCrashIT.java | 90 ++++++ .../normal/BroadCastNormalMsgRecvFailIT.java | 72 +++++ .../BroadCastNormalMsgRecvStartLaterIT.java | 88 ++++++ .../BroadCastNormalMsgTwoDiffGroupRecvIT.java | 78 +++++ .../normal/NormalMsgTwoSameGroupConsumerIT.java | 78 +++++ .../broadcast/order/OrderMsgBroadCastIT.java | 75 +++++ .../tag/BroadCastTwoConsumerFilterIT.java | 78 +++++ .../tag/BroadCastTwoConsumerSubDiffTagIT.java | 75 +++++ .../tag/BroadCastTwoConsumerSubTagIT.java | 75 +++++ .../consumer/cluster/DynamicAddAndCrashIT.java | 103 +++++++ .../consumer/cluster/DynamicAddConsumerIT.java | 97 ++++++ .../cluster/DynamicCrashConsumerIT.java | 100 ++++++ .../test/client/consumer/tag/MulTagSubIT.java | 156 ++++++++++ .../consumer/tag/TagMessageWith1ConsumerIT.java | 197 ++++++++++++ .../tag/TagMessageWithMulConsumerIT.java | 196 ++++++++++++ .../tag/TagMessageWithSameGroupConsumerIT.java | 115 +++++++ .../consumer/topic/MulConsumerMulTopicIT.java | 108 +++++++ .../consumer/topic/OneConsumerMulTopicIT.java | 104 +++++++ .../producer/async/AsyncSendExceptionIT.java | 150 +++++++++ .../async/AsyncSendWithMessageQueueIT.java | 85 ++++++ .../AsyncSendWithMessageQueueSelectorIT.java | 106 +++++++ .../async/AsyncSendWithOnlySendCallBackIT.java | 64 ++++ .../producer/exception/msg/ChinaPropIT.java | 74 +++++ .../exception/msg/MessageExceptionIT.java | 129 ++++++++ .../exception/msg/MessageUserPropIT.java | 94 ++++++ .../ProducerGroupAndInstanceNameValidityIT.java | 69 +++++ .../producer/oneway/OneWaySendExceptionIT.java | 78 +++++ .../client/producer/oneway/OneWaySendIT.java | 64 ++++ .../producer/oneway/OneWaySendWithMQIT.java | 79 +++++ .../oneway/OneWaySendWithSelectorIT.java | 104 +++++++ .../order/OrderMsgDynamicRebalanceIT.java | 115 +++++++ .../test/client/producer/order/OrderMsgIT.java | 108 +++++++ .../producer/order/OrderMsgRebalanceIT.java | 133 ++++++++ .../producer/order/OrderMsgWithTagIT.java | 169 ++++++++++ .../querymsg/QueryMsgByIdExceptionIT.java | 81 +++++ .../producer/querymsg/QueryMsgByIdIT.java | 75 +++++ .../producer/querymsg/QueryMsgByKeyIT.java | 104 +++++++ .../apache/rocketmq/test/delay/DelayConf.java | 27 ++ .../rocketmq/test/delay/NormalMsgDelayIT.java | 116 +++++++ .../test/smoke/NormalMessageSendAndRecvIT.java | 62 ++++ test/src/test/resources/log4j.xml | 46 +++ test/src/test/resources/logback-test.xml | 33 ++ 93 files changed, 8492 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index dd57ba3..916cac5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,4 +39,5 @@ script: - travis_retry mvn -B package jacoco:report coveralls:report after_success: + - mvn clean install -Pit-test - mvn sonar:sonar http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/BUILDING ---------------------------------------------------------------------- diff --git a/BUILDING b/BUILDING index a92cbd5..1498b3e 100644 --- a/BUILDING +++ b/BUILDING @@ -34,4 +34,4 @@ Then, import to eclipse by specifying the root directory of the project via: Execute the following command in order to build the tar.gz packages and install JAR to the local repository: -$ mvn clean package install -Prelease-all assembly:assembly -U \ No newline at end of file +$ mvn clean install -Prelease-all assembly:assembly -U \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8ec8170..8b8c468 100644 --- a/pom.xml +++ b/pom.xml @@ -160,9 +160,6 @@ <!-- Compiler settings properties --> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> - - <!-- Overwritten by the test configuration,otherwise the JaCoCo agent cannot be attached.Details see http://www.eclemma.org/jacoco/trunk/doc/prepare-agent-mojo.html --> - <argLine>-Xms512m -Xmx1024m</argLine> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> <!-- URL of the ASF SonarQube server --> <sonar.host.url>https://builds.apache.org/analysis</sonar.host.url> @@ -181,6 +178,7 @@ <module>example</module> <module>filtersrv</module> <module>srvutil</module> + <module>test</module> </modules> <build> @@ -237,25 +235,6 @@ </configuration> </plugin> <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.19.1</version> - <configuration> - <forkCount>1</forkCount> - <reuseForks>true</reuseForks> - <includes> - <include>**/*Test.java</include> - </includes> - </configuration> - </plugin> - <plugin> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.19.1</version> - <configuration> - <forkCount>1</forkCount> - <reuseForks>true</reuseForks> - </configuration> - </plugin> - <plugin> <artifactId>maven-javadoc-plugin</artifactId> <version>2.10.4</version> <configuration> @@ -310,6 +289,7 @@ <encoding>UTF-8</encoding> <consoleOutput>true</consoleOutput> <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> </configuration> <goals> <goal>check</goal> @@ -352,12 +332,20 @@ <goals> <goal>prepare-agent</goal> </goals> + <configuration> + <destFile>${project.build.directory}/jacoco.exec</destFile> + </configuration> </execution> <execution> <id>default-prepare-agent-integration</id> + <phase>pre-integration-test</phase> <goals> <goal>prepare-agent-integration</goal> </goals> + <configuration> + <destFile>${project.build.directory}/jacoco-it.exec</destFile> + <propertyName>failsafeArgLine</propertyName> + </configuration> </execution> <execution> <id>default-report</id> @@ -374,6 +362,14 @@ </executions> </plugin> <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + </configuration> + </plugin> + <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> <version>3.0.4</version> @@ -475,6 +471,37 @@ </plugins> </build> </profile> + <profile> + <id>it-test</id> + <build> + <plugins> + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + <argLine>@{failsafeArgLine}</argLine> + <excludes> + <exclude>**/NormalMsgDelayIT.java</exclude> + <exclude>**/BroadCastNormalMsgNotRecvIT.java</exclude> + <exclude>**/TagMessageWithSameGroupConsumerIT.java</exclude> + <exclude>**/AsyncSendWithMessageQueueSelectorIT.java</exclude> + <exclude>**/AsyncSendWithMessageQueueIT.java</exclude> + </excludes> + </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> <dependencies> @@ -537,7 +564,7 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> - <artifactId>rocketmq-qatest</artifactId> + <artifactId>rocketmq-test</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml new file mode 100644 index 0000000..09ec9d3 --- /dev/null +++ b/test/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-test</artifactId> + + + <dependencies> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-broker</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-namesrv</artifactId> + </dependency> + <dependency> + <groupId>com.google.truth</groupId> + <artifactId>truth</artifactId> + <version>0.30</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java new file mode 100644 index 0000000..6b2357b --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.mq; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.util.TestUtil; + +public class MQAsyncProducer { + private static Logger logger = Logger.getLogger(MQAsyncProducer.class); + private AbstractMQProducer producer = null; + private long msgNum; + private int intervalMills; + private Thread sendT; + private AtomicBoolean bPause = new AtomicBoolean(false); + + public MQAsyncProducer(final AbstractMQProducer producer, final long msgNum, + final int intervalMills) { + this.producer = producer; + this.msgNum = msgNum; + this.intervalMills = intervalMills; + + sendT = new Thread(new Runnable() { + public void run() { + for (int i = 0; i < msgNum; i++) { + if (!bPause.get()) { + producer.send(); + TestUtil.waitForMonment(intervalMills); + } else { + while (true) { + if (bPause.get()) { + TestUtil.waitForMonment(10); + } else + break; + } + } + + } + } + }); + + } + + public void start() { + sendT.start(); + } + + public void waitSendAll(int waitMills) { + long startTime = System.currentTimeMillis(); + while ((producer.getAllMsgBody().size() + producer.getSendErrorMsg().size()) < msgNum) { + if (System.currentTimeMillis() - startTime < waitMills) { + TestUtil.waitForMonment(200); + } else { + logger.error(String.format("time elapse:%s, but the message sending has not finished", + System.currentTimeMillis() - startTime)); + break; + } + } + } + + public void pauseProducer() { + bPause.set(true); + } + + public void notifyProducer() { + bPause.set(false); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java new file mode 100644 index 0000000..4a2ce2b --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.sendresult.SendResult; +import org.apache.rocketmq.test.util.RandomUtil; +import org.apache.rocketmq.test.util.TestUtil; + +public class RMQAsyncSendProducer extends AbstractMQProducer { + private static Logger logger = Logger + .getLogger(RMQAsyncSendProducer.class); + private String nsAddr = null; + private DefaultMQProducer producer = null; + private SendCallback sendCallback = null; + private List<org.apache.rocketmq.client.producer.SendResult> successSendResult = new ArrayList<org.apache.rocketmq.client.producer.SendResult>(); + private AtomicInteger exceptionMsgCount = new AtomicInteger( + 0); + private int msgSize = 0; + + public RMQAsyncSendProducer(String nsAddr, String topic) { + super(topic); + this.nsAddr = nsAddr; + sendCallback = new SendCallback() { + public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) { + successSendResult.add(sendResult); + } + + public void onException(Throwable throwable) { + exceptionMsgCount.getAndIncrement(); + } + }; + + create(); + start(); + } + + public int getSuccessMsgCount() { + return successSendResult.size(); + } + + public List<org.apache.rocketmq.client.producer.SendResult> getSuccessSendResult() { + return successSendResult; + } + + public int getExceptionMsgCount() { + return exceptionMsgCount.get(); + } + + private void create() { + producer = new DefaultMQProducer(); + producer.setProducerGroup(RandomUtil.getStringByUUID()); + producer.setInstanceName(RandomUtil.getStringByUUID()); + + if (nsAddr != null) { + producer.setNamesrvAddr(nsAddr); + } + + } + + private void start() { + try { + producer.start(); + } catch (MQClientException e) { + logger.error("producer start failed!"); + e.printStackTrace(); + } + } + + public SendResult send(Object msg, Object arg) { + return null; + } + + public void shutdown() { + producer.shutdown(); + } + + public void asyncSend(Object msg) { + Message metaqMsg = (Message) msg; + try { + producer.send(metaqMsg, sendCallback); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void asyncSend(int msgSize) { + this.msgSize = msgSize; + + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.asyncSend(msg); + } + } + + public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) { + Message metaqMsg = (Message) msg; + try { + producer.send(metaqMsg, selector, arg, sendCallback); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void asyncSend(int msgSize, MessageQueueSelector selector) { + this.msgSize = msgSize; + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.asyncSend(msg, selector, i); + } + } + + public void asyncSend(Object msg, MessageQueue mq) { + Message metaqMsg = (Message) msg; + try { + producer.send(metaqMsg, mq, sendCallback); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void asyncSend(int msgSize, MessageQueue mq) { + this.msgSize = msgSize; + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.asyncSend(msg, mq); + } + } + + public void waitForResponse(int timeoutMills) { + long startTime = System.currentTimeMillis(); + while (this.successSendResult.size() != this.msgSize) { + if (System.currentTimeMillis() - startTime < timeoutMills) { + TestUtil.waitForMonment(100); + } else { + logger.info("timeout but still not recv all response!"); + break; + } + } + } + + public void sendOneWay(Object msg) { + Message metaqMsg = (Message) msg; + try { + producer.sendOneway(metaqMsg); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void sendOneWay(int msgSize) { + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.sendOneWay(msg); + } + } + + public void sendOneWay(Object msg, MessageQueue mq) { + Message metaqMsg = (Message) msg; + try { + producer.sendOneway(metaqMsg, mq); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void sendOneWay(int msgSize, MessageQueue mq) { + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.sendOneWay(msg, mq); + } + } + + public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) { + Message metaqMsg = (Message) msg; + try { + producer.sendOneway(metaqMsg, selector, arg); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void sendOneWay(int msgSize, MessageQueueSelector selector) { + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + this.sendOneWay(msg, selector, i); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java new file mode 100644 index 0000000..8af49ea --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class RMQBroadCastConsumer extends RMQNormalConsumer { + private static Logger logger = Logger.getLogger(RMQBroadCastConsumer.class); + + public RMQBroadCastConsumer(String nsAddr, String topic, String subExpression, + String consumerGroup, AbstractListener listner) { + super(nsAddr, topic, subExpression, consumerGroup, listner); + } + + @Override + public void create() { + super.create(); + consumer.setMessageModel(MessageModel.BROADCASTING); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java new file mode 100644 index 0000000..3f185d3 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; +import org.apache.rocketmq.test.listener.AbstractListener; +import org.apache.rocketmq.test.util.RandomUtil; + +public class RMQNormalConsumer extends AbstractMQConsumer { + private static Logger logger = Logger.getLogger(RMQNormalConsumer.class); + protected DefaultMQPushConsumer consumer = null; + + public RMQNormalConsumer(String nsAddr, String topic, String subExpression, + String consumerGroup, AbstractListener listner) { + super(nsAddr, topic, subExpression, consumerGroup, listner); + } + + public AbstractListener getListner() { + return listner; + } + + public void setListner(AbstractListener listner) { + this.listner = listner; + } + + public void create() { + consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setInstanceName(RandomUtil.getStringByUUID()); + consumer.setNamesrvAddr(nsAddr); + try { + consumer.subscribe(topic, subExpression); + } catch (MQClientException e) { + logger.error("consumer subscribe failed!"); + e.printStackTrace(); + } + consumer.setMessageListener(listner); + } + + public void start() { + try { + consumer.start(); + logger.info(String.format("consumer[%s] started!", consumer.getConsumerGroup())); + } catch (MQClientException e) { + logger.error("consumer start failed!"); + e.printStackTrace(); + } + } + + public void subscribe(String topic, String subExpression) { + try { + consumer.subscribe(topic, subExpression); + } catch (MQClientException e) { + logger.error("consumer subscribe failed!"); + e.printStackTrace(); + } + } + + public void shutdown() { + consumer.shutdown(); + } + + @Override + public void clearMsg() { + this.listner.clearMsg(); + } + + public void restart() { + consumer.shutdown(); + create(); + start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java new file mode 100644 index 0000000..26b77fe --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import java.util.List; +import java.util.Map; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.sendresult.SendResult; + +public class RMQNormalProducer extends AbstractMQProducer { + private static Logger logger = Logger.getLogger(RMQNormalProducer.class); + private DefaultMQProducer producer = null; + private String nsAddr = null; + + public RMQNormalProducer(String nsAddr, String topic) { + super(topic); + this.nsAddr = nsAddr; + create(); + start(); + } + + public RMQNormalProducer(String nsAddr, String topic, String producerGroupName, + String producerInstanceName) { + super(topic); + this.producerGroupName = producerGroupName; + this.producerInstanceName = producerInstanceName; + this.nsAddr = nsAddr; + + create(); + start(); + } + + public DefaultMQProducer getProducer() { + return producer; + } + + public void setProducer(DefaultMQProducer producer) { + this.producer = producer; + } + + protected void create() { + producer = new DefaultMQProducer(); + producer.setProducerGroup(getProducerGroupName()); + producer.setInstanceName(getProducerInstanceName()); + + if (nsAddr != null) { + producer.setNamesrvAddr(nsAddr); + } + + } + + public void start() { + try { + producer.start(); + super.setStartSuccess(true); + } catch (MQClientException e) { + super.setStartSuccess(false); + logger.error("producer start failed!"); + e.printStackTrace(); + } + } + + public SendResult send(Object msg, Object orderKey) { + org.apache.rocketmq.client.producer.SendResult metaqResult = null; + Message metaqMsg = (Message) msg; + try { + long start = System.currentTimeMillis(); + metaqResult = producer.send(metaqMsg); + this.msgRTs.addData(System.currentTimeMillis() - start); + if (isDebug) { + logger.info(metaqResult); + } + sendResult.setMsgId(metaqResult.getMsgId()); + sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK)); + sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); + msgBodys.addData(new String(metaqMsg.getBody())); + originMsgs.addData(msg); + originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult); + } catch (Exception e) { + if (isDebug) { + e.printStackTrace(); + } + + sendResult.setSendResult(false); + sendResult.setSendException(e); + errorMsgs.addData(msg); + } + + return sendResult; + } + + public void send(Map<MessageQueue, List<Object>> msgs) { + for (MessageQueue mq : msgs.keySet()) { + send(msgs.get(mq), mq); + } + } + + public void send(List<Object> msgs, MessageQueue mq) { + for (Object msg : msgs) { + sendMQ((Message) msg, mq); + } + } + + public SendResult sendMQ(Message msg, MessageQueue mq) { + org.apache.rocketmq.client.producer.SendResult metaqResult = null; + try { + long start = System.currentTimeMillis(); + metaqResult = producer.send(msg, mq); + this.msgRTs.addData(System.currentTimeMillis() - start); + if (isDebug) { + logger.info(metaqResult); + } + sendResult.setMsgId(metaqResult.getMsgId()); + sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK)); + sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); + msgBodys.addData(new String(msg.getBody())); + originMsgs.addData(msg); + originMsgIndex.put(new String(msg.getBody()), metaqResult); + } catch (Exception e) { + if (isDebug) { + e.printStackTrace(); + } + + sendResult.setSendResult(false); + sendResult.setSendException(e); + errorMsgs.addData(msg); + } + + return sendResult; + } + + public void shutdown() { + producer.shutdown(); + } + + @Override + public List<MessageQueue> getMessageQueue() { + List<MessageQueue> mqs = null; + try { + mqs = producer.fetchPublishMessageQueues(topic); + } catch (MQClientException e) { + e.printStackTrace(); + } + return mqs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java new file mode 100644 index 0000000..a077129 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.clientinterface; + +import org.apache.rocketmq.test.listener.AbstractListener; + +public abstract class AbstractMQConsumer implements MQConsumer { + protected AbstractListener listner = null; + protected String nsAddr = null; + protected String topic = null; + protected String subExpression = null; + protected String consumerGroup = null; + protected boolean isDebug = false; + + public AbstractMQConsumer() { + } + + public AbstractMQConsumer(String nsAddr, String topic, String subExpression, + String consumerGroup, AbstractListener listner) { + this.topic = topic; + this.subExpression = subExpression; + this.consumerGroup = consumerGroup; + this.listner = listner; + this.nsAddr = nsAddr; + } + + public AbstractMQConsumer(String topic, String subExpression) { + this.topic = topic; + this.subExpression = subExpression; + } + + public void setDebug() { + if (listner != null) { + listner.setDebug(true); + } + + isDebug = true; + } + + public void setDebug(boolean isDebug) { + if (listner != null) { + listner.setDebug(isDebug); + } + + this.isDebug = isDebug; + } + + public void setSubscription(String topic, String subExpression) { + this.topic = topic; + this.subExpression = subExpression; + } + + public AbstractListener getListner() { + return listner; + } + + public void setListner(AbstractListener listner) { + this.listner = listner; + } + + public String getNsAddr() { + return nsAddr; + } + + public void setNsAddr(String nsAddr) { + this.nsAddr = nsAddr; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getSubExpression() { + return subExpression; + } + + public void setSubExpression(String subExpression) { + this.subExpression = subExpression; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public void clearMsg() { + listner.clearMsg(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java new file mode 100644 index 0000000..8201e63 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.clientinterface; + +import java.util.Date; +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.sendresult.SendResult; +import org.apache.rocketmq.test.util.RandomUtil; +import org.apache.rocketmq.test.util.TestUtil; + +public abstract class AbstractMQProducer extends MQCollector implements MQProducer { + protected String topic = null; + + protected SendResult sendResult = new SendResult(); + protected boolean startSuccess = false; + protected String producerGroupName = null; + protected String producerInstanceName = null; + protected boolean isDebug = false; + + public AbstractMQProducer(String topic) { + super(); + producerGroupName = RandomUtil.getStringByUUID(); + producerInstanceName = RandomUtil.getStringByUUID(); + this.topic = topic; + + } + + public AbstractMQProducer(String topic, String originMsgCollector, String msgBodyCollector) { + super(originMsgCollector, msgBodyCollector); + producerGroupName = RandomUtil.getStringByUUID(); + producerInstanceName = RandomUtil.getStringByUUID(); + this.topic = topic; + } + + public boolean isStartSuccess() { + return startSuccess; + } + + public void setStartSuccess(boolean startSuccess) { + this.startSuccess = startSuccess; + } + + public String getProducerInstanceName() { + return producerInstanceName; + } + + public void setProducerInstanceName(String producerInstanceName) { + this.producerInstanceName = producerInstanceName; + } + + public String getProducerGroupName() { + return producerGroupName; + } + + public void setProducerGroupName(String producerGroupName) { + this.producerGroupName = producerGroupName; + } + + public void setDebug() { + isDebug = true; + } + + public void setDebug(boolean isDebug) { + this.isDebug = isDebug; + } + + public void setRun() { + isDebug = false; + } + + public List<MessageQueue> getMessageQueue() { + return null; + } + + private Object getMessage() { + return this.getMessageByTag(null); + } + + public Object getMessageByTag(String tag) { + Object objMsg = null; + if (this instanceof RMQNormalProducer) { + org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message( + topic, (RandomUtil.getStringByUUID() + "." + new Date()).getBytes()); + objMsg = msg; + if (tag != null) { + msg.setTags(tag); + } + } + return objMsg; + } + + public void send() { + Object msg = getMessage(); + send(msg, null); + } + + public void send(Object msg) { + send(msg, null); + } + + public void send(long msgNum) { + for (int i = 0; i < msgNum; i++) { + this.send(); + } + } + + public void send(long msgNum, int intervalMills) { + for (int i = 0; i < msgNum; i++) { + this.send(); + TestUtil.waitForMonment(intervalMills); + } + } + + public void send(String tag, int msgSize) { + for (int i = 0; i < msgSize; i++) { + Object msg = getMessageByTag(tag); + send(msg, null); + } + } + + public void send(String tag, int msgSize, int intervalMills) { + for (int i = 0; i < msgSize; i++) { + Object msg = getMessageByTag(tag); + send(msg, null); + TestUtil.waitForMonment(intervalMills); + } + } + + public void send(List<Object> msgs) { + for (Object msg : msgs) { + this.send(msg, null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java new file mode 100644 index 0000000..42d4b62 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.clientinterface; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.test.util.RandomUtil; +import org.apache.rocketmq.test.util.data.collect.DataCollector; +import org.apache.rocketmq.test.util.data.collect.DataCollectorManager; + +public abstract class MQCollector { + protected DataCollector msgBodys = null; + protected DataCollector originMsgs = null; + protected DataCollector errorMsgs = null; + protected Map<Object, Object> originMsgIndex = null; + protected Collection<Object> msgBodysCopy = null; + + protected DataCollector msgRTs = null; + + public MQCollector() { + msgBodys = DataCollectorManager.getInstance() + .fetchListDataCollector(RandomUtil.getStringByUUID()); + originMsgs = DataCollectorManager.getInstance() + .fetchListDataCollector(RandomUtil.getStringByUUID()); + errorMsgs = DataCollectorManager.getInstance() + .fetchListDataCollector(RandomUtil.getStringByUUID()); + originMsgIndex = new ConcurrentHashMap<Object, Object>(); + msgRTs = DataCollectorManager.getInstance() + .fetchListDataCollector(RandomUtil.getStringByUUID()); + } + + public MQCollector(String originMsgCollector, String msgBodyCollector) { + originMsgs = DataCollectorManager.getInstance().fetchDataCollector(originMsgCollector); + msgBodys = DataCollectorManager.getInstance().fetchDataCollector(msgBodyCollector); + } + + public Collection<Object> getAllMsgBody() { + return msgBodys.getAllData(); + } + + public Collection<Object> getAllOriginMsg() { + return originMsgs.getAllData(); + } + + public Object getFirstMsg() { + return ((List<Object>) originMsgs.getAllData()).get(0); + } + + public Collection<Object> getAllUndupMsgBody() { + return msgBodys.getAllDataWithoutDuplicate(); + } + + public Collection<Object> getAllUndupOriginMsg() { + return originMsgs.getAllData(); + } + + public Collection<Object> getSendErrorMsg() { + return errorMsgs.getAllData(); + } + + public Collection<Object> getMsgRTs() { + return msgRTs.getAllData(); + } + + public Map<Object, Object> getOriginMsgIndex() { + return originMsgIndex; + } + + public Collection<Object> getMsgBodysCopy() { + msgBodysCopy = new ArrayList<Object>(); + msgBodysCopy.addAll(msgBodys.getAllData()); + return msgBodysCopy; + } + + public void clearMsg() { + msgBodys.resetData(); + originMsgs.resetData(); + errorMsgs.resetData(); + originMsgIndex.clear(); + msgRTs.resetData(); + } + + public void lockCollectors() { + msgBodys.lockIncrement(); + originMsgs.lockIncrement(); + errorMsgs.lockIncrement(); + msgRTs.lockIncrement(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java new file mode 100644 index 0000000..aaa4b27 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.clientinterface; + +public interface MQConsumer { + void create(); + + void start(); + + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java new file mode 100644 index 0000000..795457d --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.clientinterface; + +import org.apache.rocketmq.test.sendresult.SendResult; + +public interface MQProducer { + SendResult send(Object msg, Object arg); + + void setDebug(); + + void setRun(); + + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java new file mode 100644 index 0000000..b5b3fdd --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class ConsumerFactory { + + public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup, + String topic, String subExpression, + AbstractListener listner) { + RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression, + consumerGroup, listner); + consumer.create(); + consumer.start(); + return consumer; + } + + public static RMQBroadCastConsumer getRMQBroadCastConsumer(String nsAddr, String consumerGroup, + String topic, String subExpression, + AbstractListener listner) { + RMQBroadCastConsumer consumer = new RMQBroadCastConsumer(nsAddr, topic, subExpression, + consumerGroup, listner); + consumer.create(); + consumer.start(); + return consumer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java new file mode 100644 index 0000000..f998fcb --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.util.RandomUtil; + +public class MQMessageFactory { + private static Integer index = 0; + + public static List<Object> getRMQMessage(String tag, String topic, int msgSize) { + List<Object> msgs = new ArrayList<Object>(); + for (int i = 0; i < msgSize; i++) { + msgs.add(new Message(topic, tag, RandomUtil.getStringByUUID().getBytes())); + } + + return msgs; + } + + public static List<Object> getRMQMessage(List<String> tags, String topic, int msgSize) { + List<Object> msgs = new ArrayList<Object>(); + for (int i = 0; i < msgSize; i++) { + for (String tag : tags) { + msgs.add(new Message(topic, tag, RandomUtil.getStringByUUID().getBytes())); + } + } + return msgs; + } + + public static List<Object> getMessageBody(List<Object> msgs) { + List<Object> msgBodys = new ArrayList<Object>(); + for (Object msg : msgs) { + msgBodys.add(new String(((Message) msg).getBody())); + } + + return msgBodys; + } + + public static Collection<Object> getMessage(Collection<Object>... msgs) { + Collection<Object> allMsgs = new ArrayList<Object>(); + for (Collection<Object> msg : msgs) { + allMsgs.addAll(msg); + } + return allMsgs; + } + + public static List<Object> getDelayMsg(String topic, int delayLevel, int msgSize) { + List<Object> msgs = new ArrayList<Object>(); + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes()); + msg.setDelayTimeLevel(delayLevel); + msgs.add(msg); + } + return msgs; + } + + public static List<Object> getKeyMsg(String topic, String key, int msgSize) { + List<Object> msgs = new ArrayList<Object>(); + for (int i = 0; i < msgSize; i++) { + Message msg = new Message(topic, null, key, RandomUtil.getStringByUUID().getBytes()); + msgs.add(msg); + } + return msgs; + } + + public static Map<MessageQueue, List<Object>> getMsgByMQ(MessageQueue mq, int msgSize) { + List<MessageQueue> mqs = new ArrayList<MessageQueue>(); + mqs.add(mq); + return getMsgByMQ(mqs, msgSize); + } + + public static Map<MessageQueue, List<Object>> getMsgByMQ(List<MessageQueue> mqs, int msgSize) { + return getMsgByMQ(mqs, msgSize, null); + } + + public static Map<MessageQueue, List<Object>> getMsgByMQ(List<MessageQueue> mqs, int msgSize, + String tag) { + Map<MessageQueue, List<Object>> msgs = new HashMap<MessageQueue, List<Object>>(); + for (MessageQueue mq : mqs) { + msgs.put(mq, getMsg(mq.getTopic(), msgSize, tag)); + } + return msgs; + } + + public static List<Object> getMsg(String topic, int msgSize) { + return getMsg(topic, msgSize, null); + } + + public static List<Object> getMsg(String topic, int msgSize, String tag) { + List<Object> msgs = new ArrayList<Object>(); + while (msgSize > 0) { + Message msg = new Message(topic, (index++).toString().getBytes()); + if (tag != null) { + msg.setTags(tag); + } + msgs.add(msg); + msgSize--; + } + + return msgs; + } + + public static List<MessageQueue> getMessageQueues(MessageQueue... mq) { + return Arrays.asList(mq); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java new file mode 100644 index 0000000..8f71700 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.util.RandomUtils; + +public class MessageFactory { + + public static Message getRandomMessage(String topic) { + return getStringMessage(topic, RandomUtils.getStringByUUID()); + } + + public static Message getStringMessage(String topic, String body) { + Message msg = new Message(topic, body.getBytes()); + return msg; + } + + public static Message getStringMessageByTag(String topic, String tags, String body) { + Message msg = new Message(topic, tags, body.getBytes()); + return msg; + } + + public static Message getRandomMessageByTag(String topic, String tags) { + return getStringMessageByTag(topic, tags, RandomUtils.getStringByUUID()); + } + + public static Collection<Message> getRandomMessageList(String topic, int size) { + List<Message> msgList = new ArrayList<Message>(); + for (int i = 0; i < size; i++) { + msgList.add(getRandomMessage(topic)); + } + return msgList; + } + + public static Collection<Message> getRandomMessageListByTag(String topic, String tags, int size) { + List<Message> msgList = new ArrayList<Message>(); + for (int i = 0; i < size; i++) { + msgList.add(getRandomMessageByTag(topic, tags)); + } + return msgList; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java new file mode 100644 index 0000000..66767cc --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.test.util.RandomUtil; + +public class ProducerFactory { + + public static DefaultMQProducer getRMQProducer(String ns) { + DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); + producer.setNamesrvAddr(ns); + try { + producer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + + return producer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java new file mode 100644 index 0000000..64764c6 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; + +public class SendCallBackFactory { + public static SendCallback getSendCallBack() { + return new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + + @Override + public void onException(Throwable throwable) { + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java b/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java new file mode 100644 index 0000000..b7eb6a6 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.factory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TagMessage { + private List<String> tags = null; + private String topic = null; + private int msgSize = 0; + private Map<String, List<Object>> rmqMsgs = new HashMap<String, List<Object>>(); + + public TagMessage(String tag, String topic, int msgSize) { + String[] tags = {tag}; + this.tags = Arrays.asList(tags); + this.topic = topic; + this.msgSize = msgSize; + + init(); + } + + public TagMessage(String[] tags, String topic, int msgSize) { + this(Arrays.asList(tags), topic, msgSize); + } + + public TagMessage(List<String> tags, String topic, int msgSize) { + this.tags = tags; + this.topic = topic; + this.msgSize = msgSize; + + init(); + } + + private void init() { + for (String tag : tags) { + List<Object> tagMsgs = MQMessageFactory.getRMQMessage(tag, topic, msgSize); + rmqMsgs.put(tag, tagMsgs); + } + } + + public List<Object> getMessageByTag(String tag) { + if (tags.contains(tag)) { + return rmqMsgs.get(tag); + } else { + return new ArrayList<Object>(); + } + } + + public List<Object> getMixedTagMessages() { + List<Object> mixedMsgs = new ArrayList<Object>(); + for (int i = 0; i < msgSize; i++) { + for (String tag : tags) { + mixedMsgs.add(rmqMsgs.get(tag).get(i)); + } + } + + return mixedMsgs; + } + + public List<Object> getMessageBodyByTag(String tag) { + if (tags.contains(tag)) { + return MQMessageFactory.getMessageBody(rmqMsgs.get(tag)); + } else { + return new ArrayList<Object>(); + } + } + + public List<Object> getMessageBodyByTag(String... tag) { + return this.getMessageBodyByTag(Arrays.asList(tag)); + } + + public List<Object> getMessageBodyByTag(List<String> tags) { + List<Object> msgBodys = new ArrayList<Object>(); + for (String tag : tags) { + msgBodys.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag))); + } + return msgBodys; + } + + public List<Object> getAllTagMessageBody() { + List<Object> msgs = new ArrayList<Object>(); + for (String tag : tags) { + msgs.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag))); + } + + return msgs; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java new file mode 100644 index 0000000..974434a --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.listener; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.test.clientinterface.MQCollector; +import org.apache.rocketmq.test.util.TestUtil; + +public class AbstractListener extends MQCollector implements MessageListener { + public static Logger logger = Logger.getLogger(AbstractListener.class); + protected boolean isDebug = false; + protected String listnerName = null; + protected Collection<Object> allSendMsgs = null; + + public AbstractListener() { + super(); + } + + public AbstractListener(String listnerName) { + super(); + this.listnerName = listnerName; + } + + public AbstractListener(String originMsgCollector, String msgBodyCollector) { + super(originMsgCollector, msgBodyCollector); + } + + public boolean isDebug() { + return isDebug; + } + + public void setDebug(boolean debug) { + isDebug = debug; + } + + public void waitForMessageConsume(int timeoutMills) { + TestUtil.waitForMonment(timeoutMills); + } + + public void stopRecv() { + super.lockCollectors(); + } + + public Collection<Object> waitForMessageConsume(Collection<Object> allSendMsgs, + int timeoutMills) { + this.allSendMsgs = allSendMsgs; + List<Object> sendMsgs = new ArrayList<Object>(); + sendMsgs.addAll(allSendMsgs); + + long curTime = System.currentTimeMillis(); + while (!sendMsgs.isEmpty()) { + Iterator<Object> iter = sendMsgs.iterator(); + while (iter.hasNext()) { + Object msg = iter.next(); + if (msgBodys.getAllData().contains(msg)) { + iter.remove(); + } + } + if (sendMsgs.isEmpty()) { + break; + } else { + if (System.currentTimeMillis() - curTime >= timeoutMills) { + logger.error(String.format("timeout but [%s] not recv all send messages!", + listnerName)); + break; + } else { + logger.info(String.format("[%s] still [%s] msg not recv!", listnerName, + sendMsgs.size())); + TestUtil.waitForMonment(500); + } + } + } + + return sendMsgs; + } + + public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) { + Collection<Object> notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills); + for (Object object : notRecvMsgs) { + logger.info(sendMsgIndex.get(object)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java new file mode 100644 index 0000000..b4a0870 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.listener.rmq.concurrent; + +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.listener.AbstractListener; +import org.apache.rocketmq.test.util.RandomUtil; +import org.apache.rocketmq.test.util.data.collect.DataCollector; +import org.apache.rocketmq.test.util.data.collect.DataCollectorManager; + +public class RMQDelayListner extends AbstractListener implements MessageListenerConcurrently { + private DataCollector msgDelayTimes = null; + + public RMQDelayListner() { + msgDelayTimes = DataCollectorManager.getInstance() + .fetchDataCollector(RandomUtil.getStringByUUID()); + } + + public Collection<Object> getMsgDelayTimes() { + return msgDelayTimes.getAllData(); + } + + public void resetMsgDelayTimes() { + msgDelayTimes.resetData(); + } + + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext consumeConcurrentlyContext) { + long recvTime = System.currentTimeMillis(); + for (MessageExt msg : msgs) { + if (isDebug) { + logger.info(listnerName + ":" + msg); + } + + msgBodys.addData(new String(msg.getBody())); + originMsgs.addData(msg); + msgDelayTimes.addData(Math.abs(recvTime - msg.getBornTimestamp())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java new file mode 100644 index 0000000..0d40881 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.listener.rmq.concurrent; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently { + private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + private AtomicInteger msgIndex = new AtomicInteger(0); + + public RMQNormalListner() { + super(); + } + + public RMQNormalListner(String listnerName) { + super(listnerName); + } + + public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) { + super(); + this.consumeStatus = consumeStatus; + } + + public RMQNormalListner(String originMsgCollector, String msgBodyCollector) { + super(originMsgCollector, msgBodyCollector); + } + + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext consumeConcurrentlyContext) { + for (MessageExt msg : msgs) { + msgIndex.getAndIncrement(); + if (isDebug) { + if (listnerName != null && listnerName != "") { + logger.info(listnerName + ":" + msgIndex.get() + ":" + + String.format("msgid:%s broker:%s queueId:%s offset:%s", + msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), + msg.getQueueOffset())); + } else { + logger.info(msg); + } + } + + msgBodys.addData(new String(msg.getBody())); + originMsgs.addData(msg); + originMsgIndex.put(new String(msg.getBody()), msg); + } + return consumeStatus; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java new file mode 100644 index 0000000..91883d8 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.listener.rmq.order; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class RMQOrderListener extends AbstractListener implements MessageListenerOrderly { + private Map<String/* brokerId_brokerIp */, Collection<Object>> msgs = new ConcurrentHashMap<String, Collection<Object>>(); + + public RMQOrderListener() { + super(); + } + + public RMQOrderListener(String listnerName) { + super(listnerName); + } + + public RMQOrderListener(String originMsgCollector, String msgBodyCollector) { + super(originMsgCollector, msgBodyCollector); + } + + public Collection<Collection<Object>> getMsgs() { + return msgs.values(); + } + + private void putMsg(MessageExt msg) { + Collection<Object> msgQueue = null; + String key = getKey(msg.getQueueId(), msg.getStoreHost().toString()); + if (!msgs.containsKey(key)) { + msgQueue = new ArrayList<Object>(); + } else { + msgQueue = msgs.get(key); + } + + msgQueue.add(new String(msg.getBody())); + msgs.put(key, msgQueue); + } + + private String getKey(int queueId, String brokerIp) { + return String.format("%s_%s", queueId, brokerIp); + } + + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeOrderlyContext context) { + for (MessageExt msg : msgs) { + if (isDebug) { + if (listnerName != null && listnerName != "") { + logger.info(listnerName + ": " + msg); + } else { + logger.info(msg); + } + } + + putMsg(msg); + msgBodys.addData(new String(msg.getBody())); + originMsgs.addData(msg); + } + + return ConsumeOrderlyStatus.SUCCESS; + } +}
