This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop_oms_0.3.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop_oms_0.3.0 by this push:
new 7535889 Run tests
7535889 is described below
commit 753588947b782d1abfbde5998783895e6e7e3ddc
Author: shutian.lzh <[email protected]>
AuthorDate: Thu Apr 19 16:44:32 2018 +0800
Run tests
---
.../example/openmessaging/SimplePullConsumer.java | 36 ++++++++++++++++------
.../apache/rocketmq/example/simple/Producer.java | 2 +-
.../example/simple/PullScheduleService.java | 2 +-
.../rocketmq/consumer/PullConsumerImpl.java | 2 +-
4 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 4ddf50f..86aba41 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+ messagingAccessPoint.startup();
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- consumer.shutdown();
- messagingAccessPoint.shutdown();
- }
- }));
+ final String queueName = "TopicTest";
- consumer.attachQueue("OMS_HELLO_TOPIC");
+ producer.startup();
+ Message msg = producer.createBytesMessage(queueName, "Hello Open
Messaging".getBytes());
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("Send Message OK. MsgId: %s%n",
sendResult.messageId());
+ producer.shutdown();
+
+ consumer.attachQueue(queueName);
consumer.startup();
System.out.printf("Consumer startup OK%n");
- while (true) {
+ // Keep running until we find the one that has just been sent
+ boolean stop = false;
+ while (!stop) {
Message message = consumer.receive();
if (message != null) {
String msgId =
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
+
+ if (!stop) {
+ stop = msgId.equalsIgnoreCase(sendResult.messageId());
+ }
+
+ } else {
+ System.out.printf("Return without any message%n");
}
}
+
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 5751d22..7b504dd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -29,7 +29,7 @@ public class Producer {
producer.start();
- for (int i = 0; i < 10000000; i++)
+ for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
index 151628f..8cfdd9b 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -32,7 +32,7 @@ public class PullScheduleService {
final MQPullConsumerScheduleService scheduleService = new
MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
- scheduleService.registerPullTaskCallback("TopicTest1", new
PullTaskCallback() {
+ scheduleService.registerPullTaskCallback("TopicTest", new
PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 225b09e..2e22509 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
--
To stop receiving notification emails like this one, please contact
[email protected].