This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2169e3c Make code compatible to OMS 0.3.0
2169e3c is described below
commit 2169e3c075e48f8f016337c9b81eafc258b9b9b1
Author: shutian.lzh <[email protected]>
AuthorDate: Sun Apr 15 16:13:35 2018 +0800
Make code compatible to OMS 0.3.0
---
.../org/apache/rocketmq/broker/BrokerStartup.java | 1 +
.../rocketmq/client/log/ClientLoggerTest.java | 5 +-
.../example/openmessaging/SimpleProducer.java | 54 ++++-----
.../example/openmessaging/SimplePullConsumer.java | 54 ++++++---
.../example/openmessaging/SimplePushConsumer.java | 19 ++-
.../apache/rocketmq/example/simple/Producer.java | 2 +-
.../example/simple/PullScheduleService.java | 2 +-
.../rocketmq/MessagingAccessPointImpl.java | 68 ++++-------
.../rocketmq/config/ClientConfig.java | 128 ++++++++++-----------
.../rocketmq/consumer/LocalMessageCache.java | 12 +-
.../rocketmq/consumer/PullConsumerImpl.java | 44 ++++---
.../rocketmq/consumer/PushConsumerImpl.java | 60 +++++++---
.../rocketmq/domain/BytesMessageImpl.java | 48 ++++----
.../rocketmq/domain/RocketMQConstants.java | 7 ++
.../rocketmq/domain/SendResultImpl.java | 3 +-
.../rocketmq/producer/AbstractOMSProducer.java | 27 ++---
.../rocketmq/producer/ProducerImpl.java | 45 ++++++--
.../rocketmq/producer/SequenceProducerImpl.java | 95 ---------------
.../rocketmq/promise/DefaultPromise.java | 15 +--
.../io/openmessaging/rocketmq/utils/BeanUtils.java | 2 +-
.../io/openmessaging/rocketmq/utils/OMSUtil.java | 62 +++++-----
.../rocketmq/consumer/PullConsumerImplTest.java | 24 ++--
.../rocketmq/consumer/PushConsumerImplTest.java | 18 ++-
.../rocketmq/producer/ProducerImplTest.java | 16 +--
.../producer/SequenceProducerImplTest.java | 86 --------------
.../rocketmq/promise/DefaultPromiseTest.java | 38 ++----
.../rocketmq/utils/BeanUtilsTest.java | 4 +-
pom.xml | 2 +-
28 files changed, 392 insertions(+), 549 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index f0a1150..1fc1b3b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -70,6 +70,7 @@ public class BrokerStartup {
}
log.info(tip);
+ System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
index 9fe0d8b..4888186 100644
--- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
@@ -49,7 +49,10 @@ public class ClientLoggerTest {
rocketmqCommon.info("common message {}", i, new
RuntimeException());
rocketmqRemoting.info("remoting message {}", i, new
RuntimeException());
}
-
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
Assert.assertTrue(content.contains("testClientlog"));
Assert.assertTrue(content.contains("RocketmqClient"));
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 9d162ac..dbe1d10 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -16,19 +16,20 @@
*/
package org.apache.rocketmq.example.openmessaging;
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
-import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
-import io.openmessaging.SendResult;
+import io.openmessaging.OMS;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
+import java.util.concurrent.CountDownLatch;
public class SimpleProducer {
public static void main(String[] args) {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint =
+
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
@@ -38,39 +39,40 @@ public class SimpleProducer {
producer.startup();
System.out.printf("Producer startup OK%n");
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- producer.shutdown();
- messagingAccessPoint.shutdown();
- }
- }));
-
{
- Message message =
producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ Message message = producer.createBytesMessage("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n",
sendResult.messageId());
}
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
{
- final Promise<SendResult> result =
producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
- result.addListener(new PromiseListener<SendResult>() {
- @Override
- public void operationCompleted(Promise<SendResult> promise) {
- System.out.printf("Send async message OK, msgId: %s%n",
promise.get().messageId());
- }
-
+ final Future<SendResult> result =
producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ result.addListener(new FutureListener<SendResult>() {
@Override
- public void operationFailed(Promise<SendResult> promise) {
- System.out.printf("Send async message Failed, error:
%s%n", promise.getThrowable().getMessage());
+ public void operationComplete(Future<SendResult> future) {
+ if (future.getThrowable() != null) {
+ System.out.printf("Send async message Failed, error:
%s%n", future.getThrowable().getMessage());
+ } else {
+ System.out.printf("Send async message OK, msgId:
%s%n", future.get().messageId());
+ }
+ countDownLatch.countDown();
}
});
}
{
-
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
+
+ try {
+ countDownLatch.await();
+ Thread.sleep(500); // Wait some time for one-way delivery.
+ } catch (InterruptedException ignore) {
+ }
+
+ producer.shutdown();
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 8e06772..86aba41 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -17,42 +17,60 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PullConsumer;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
public class SimplePullConsumer {
public static void main(String[] args) {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint =
+
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
- final PullConsumer consumer =
messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
- OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"OMS_CONSUMER"));
+ messagingAccessPoint.startup();
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
+ final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
+ OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- consumer.shutdown();
- messagingAccessPoint.shutdown();
- }
- }));
+ final String queueName = "TopicTest";
+
+ producer.startup();
+ Message msg = producer.createBytesMessage(queueName, "Hello Open
Messaging".getBytes());
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("Send Message OK. MsgId: %s%n",
sendResult.messageId());
+ producer.shutdown();
+
+ consumer.attachQueue(queueName);
consumer.startup();
System.out.printf("Consumer startup OK%n");
- while (true) {
- Message message = consumer.poll();
+ // Keep running until we find the one that has just been sent
+ boolean stop = false;
+ while (!stop) {
+ Message message = consumer.receive();
if (message != null) {
- String msgId =
message.headers().getString(MessageHeader.MESSAGE_ID);
+ String msgId =
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
+
+ if (!stop) {
+ stop = msgId.equalsIgnoreCase(sendResult.messageId());
+ }
+
+ } else {
+ System.out.printf("Return without any message%n");
}
}
+
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index b0935d4..220c132 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -17,22 +17,19 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
+import io.openmessaging.consumer.PushConsumer;
public class SimplePushConsumer {
public static void main(String[] args) {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint = OMS
+
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint.
-
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"OMS_CONSUMER"));
+
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID,
"OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
@@ -47,8 +44,8 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
- public void onMessage(final Message message, final
ReceivedMessageContext context) {
- System.out.printf("Received one message: %s%n",
message.headers().getString(MessageHeader.MESSAGE_ID));
+ public void onReceived(Message message, Context context) {
+ System.out.printf("Received one message: %s%n",
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
context.ack();
}
});
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 5751d22..7b504dd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -29,7 +29,7 @@ public class Producer {
producer.start();
- for (int i = 0; i < 10000000; i++)
+ for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
index 151628f..8cfdd9b 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -32,7 +32,7 @@ public class PullScheduleService {
final MQPullConsumerScheduleService scheduleService = new
MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
- scheduleService.registerPullTaskCallback("TopicTest1", new
PullTaskCallback() {
+ scheduleService.registerPullTaskCallback("TopicTest", new
PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 65caf84..51388f9 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -16,24 +16,21 @@
*/
package io.openmessaging.rocketmq;
-import io.openmessaging.IterableConsumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.Producer;
-import io.openmessaging.PullConsumer;
-import io.openmessaging.PushConsumer;
import io.openmessaging.ResourceManager;
-import io.openmessaging.SequenceProducer;
-import io.openmessaging.ServiceEndPoint;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.consumer.PushConsumer;
+import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSNotSupportedException;
-import io.openmessaging.observer.Observer;
+import io.openmessaging.producer.Producer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl;
-import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
+
private final KeyValue accessPointProperties;
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
@@ -41,11 +38,16 @@ public class MessagingAccessPointImpl implements
MessagingAccessPoint {
}
@Override
- public KeyValue properties() {
+ public KeyValue attributes() {
return accessPointProperties;
}
@Override
+ public String implVersion() {
+ return "0.3.0";
+ }
+
+ @Override
public Producer createProducer() {
return new ProducerImpl(this.accessPointProperties);
}
@@ -56,16 +58,6 @@ public class MessagingAccessPointImpl implements
MessagingAccessPoint {
}
@Override
- public SequenceProducer createSequenceProducer() {
- return new SequenceProducerImpl(this.accessPointProperties);
- }
-
- @Override
- public SequenceProducer createSequenceProducer(KeyValue properties) {
- return new
SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties,
properties));
- }
-
- @Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
@@ -76,51 +68,31 @@ public class MessagingAccessPointImpl implements
MessagingAccessPoint {
}
@Override
- public PullConsumer createPullConsumer(String queueName) {
- return new PullConsumerImpl(queueName, accessPointProperties);
+ public PullConsumer createPullConsumer() {
+ return new PullConsumerImpl(accessPointProperties);
}
@Override
- public PullConsumer createPullConsumer(String queueName, KeyValue
properties) {
- return new PullConsumerImpl(queueName,
OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ public PullConsumer createPullConsumer(KeyValue attributes) {
+ return new
PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
}
@Override
- public IterableConsumer createIterableConsumer(String queueName) {
- throw new OMSNotSupportedException("-1", "IterableConsumer is not
supported in current version");
+ public StreamingConsumer createStreamingConsumer() {
+ return null;
}
@Override
- public IterableConsumer createIterableConsumer(String queueName, KeyValue
properties) {
- throw new OMSNotSupportedException("-1", "IterableConsumer is not
supported in current version");
+ public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
+ return null;
}
@Override
- public ResourceManager getResourceManager() {
+ public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not
supported in current version.");
}
@Override
- public ServiceEndPoint createServiceEndPoint() {
- throw new OMSNotSupportedException("-1", "ServiceEndPoint is not
supported in current version.");
- }
-
- @Override
- public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
- throw new OMSNotSupportedException("-1", "ServiceEndPoint is not
supported in current version.");
- }
-
- @Override
- public void addObserver(Observer observer) {
- //Ignore
- }
-
- @Override
- public void deleteObserver(Observer observer) {
- //Ignore
- }
-
- @Override
public void startup() {
//Ignore
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
index 7077c6d..a5dfe49 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
@@ -16,20 +16,20 @@
*/
package io.openmessaging.rocketmq.config;
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
-public class ClientConfig implements PropertyKeys, NonStandardKeys {
- private String omsDriverImpl;
- private String omsAccessPoints;
- private String omsNamespace;
- private String omsProducerId;
- private String omsConsumerId;
- private int omsOperationTimeout = 5000;
- private String omsRoutingName;
- private String omsOperatorName;
- private String omsDstQueue;
- private String omsSrcTopic;
+public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
+ private String driverImpl;
+ private String accessPoints;
+ private String namespace;
+ private String producerId;
+ private String consumerId;
+ private int operationTimeout = 5000;
+ private String region;
+ private String routingSource;
+ private String routingDestination;
+ private String routingExpression;
private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16;
@@ -40,84 +40,60 @@ public class ClientConfig implements PropertyKeys,
NonStandardKeys {
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
- public String getOmsDriverImpl() {
- return omsDriverImpl;
+ public String getDriverImpl() {
+ return driverImpl;
}
- public void setOmsDriverImpl(final String omsDriverImpl) {
- this.omsDriverImpl = omsDriverImpl;
+ public void setDriverImpl(final String driverImpl) {
+ this.driverImpl = driverImpl;
}
- public String getOmsAccessPoints() {
- return omsAccessPoints;
+ public String getAccessPoints() {
+ return accessPoints;
}
- public void setOmsAccessPoints(final String omsAccessPoints) {
- this.omsAccessPoints = omsAccessPoints;
+ public void setAccessPoints(final String accessPoints) {
+ this.accessPoints = accessPoints;
}
- public String getOmsNamespace() {
- return omsNamespace;
+ public String getNamespace() {
+ return namespace;
}
- public void setOmsNamespace(final String omsNamespace) {
- this.omsNamespace = omsNamespace;
+ public void setNamespace(final String namespace) {
+ this.namespace = namespace;
}
- public String getOmsProducerId() {
- return omsProducerId;
+ public String getProducerId() {
+ return producerId;
}
- public void setOmsProducerId(final String omsProducerId) {
- this.omsProducerId = omsProducerId;
+ public void setProducerId(final String producerId) {
+ this.producerId = producerId;
}
- public String getOmsConsumerId() {
- return omsConsumerId;
+ public String getConsumerId() {
+ return consumerId;
}
- public void setOmsConsumerId(final String omsConsumerId) {
- this.omsConsumerId = omsConsumerId;
+ public void setConsumerId(final String consumerId) {
+ this.consumerId = consumerId;
}
- public int getOmsOperationTimeout() {
- return omsOperationTimeout;
+ public int getOperationTimeout() {
+ return operationTimeout;
}
- public void setOmsOperationTimeout(final int omsOperationTimeout) {
- this.omsOperationTimeout = omsOperationTimeout;
+ public void setOperationTimeout(final int operationTimeout) {
+ this.operationTimeout = operationTimeout;
}
- public String getOmsRoutingName() {
- return omsRoutingName;
+ public String getRoutingSource() {
+ return routingSource;
}
- public void setOmsRoutingName(final String omsRoutingName) {
- this.omsRoutingName = omsRoutingName;
- }
-
- public String getOmsOperatorName() {
- return omsOperatorName;
- }
-
- public void setOmsOperatorName(final String omsOperatorName) {
- this.omsOperatorName = omsOperatorName;
- }
-
- public String getOmsDstQueue() {
- return omsDstQueue;
- }
-
- public void setOmsDstQueue(final String omsDstQueue) {
- this.omsDstQueue = omsDstQueue;
- }
-
- public String getOmsSrcTopic() {
- return omsSrcTopic;
- }
-
- public void setOmsSrcTopic(final String omsSrcTopic) {
- this.omsSrcTopic = omsSrcTopic;
+ public void setRoutingSource(final String routingSource) {
+ this.routingSource = routingSource;
}
public String getRmqConsumerGroup() {
@@ -191,4 +167,28 @@ public class ClientConfig implements PropertyKeys,
NonStandardKeys {
public void setRmqPullMessageCacheCapacity(final int
rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
}
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getRoutingDestination() {
+ return routingDestination;
+ }
+
+ public void setRoutingDestination(String routingDestination) {
+ this.routingDestination = routingDestination;
+ }
+
+ public String getRoutingExpression() {
+ return routingExpression;
+ }
+
+ public void setRoutingExpression(String routingExpression) {
+ this.routingExpression = routingExpression;
+ }
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index cc1a515..93e60a7 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.Message;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
@@ -37,11 +37,11 @@ import
org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
@@ -91,13 +91,13 @@ class LocalMessageCache implements ServiceLifecycle {
}
MessageExt poll() {
- return poll(clientConfig.getOmsOperationTimeout());
+ return poll(clientConfig.getOperationTimeout());
}
MessageExt poll(final KeyValue properties) {
- int currentPollTimeout = clientConfig.getOmsOperationTimeout();
- if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
- currentPollTimeout =
properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+ int currentPollTimeout = clientConfig.getOperationTimeout();
+ if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
+ currentPollTimeout =
properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index da4afdb..2e22509 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
@@ -34,28 +34,25 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
- private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static InternalLogger log = ClientLogger.getLog();
- public PullConsumerImpl(final String queueName, final KeyValue properties)
{
+ public PullConsumerImpl(final KeyValue properties) {
this.properties = properties;
- this.targetQueueName = queueName;
-
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String consumerGroup = clientConfig.getRmqConsumerGroup();
+ String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary
for RocketMQ, please set it.");
}
@@ -63,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer =
pullConsumerScheduleService.getDefaultMQPullConsumer();
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
@@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
- properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+ properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new
LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
}
@Override
- public KeyValue properties() {
+ public KeyValue attributes() {
return properties;
}
@Override
- public Message poll() {
+ public PullConsumer attachQueue(String queueName) {
+ registerPullTaskCallback(queueName);
+ return this;
+ }
+
+ @Override
+ public PullConsumer attachQueue(String queueName, KeyValue attributes) {
+ registerPullTaskCallback(queueName);
+ return this;
+ }
+
+ @Override
+ public PullConsumer detachQueue(String queueName) {
+ this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
+ return this;
+ }
+
+ @Override
+ public Message receive() {
MessageExt rmqMsg = localMessageCache.poll();
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
- public Message poll(final KeyValue properties) {
+ public Message receive(final KeyValue properties) {
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer {
public synchronized void startup() {
if (!started) {
try {
- registerPullTaskCallback();
this.pullConsumerScheduleService.start();
this.localMessageCache.startup();
} catch (MQClientException e) {
@@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer {
this.started = true;
}
- private void registerPullTaskCallback() {
+ private void registerPullTaskCallback(final String targetQueueName) {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new
PullTaskCallback() {
@Override
public void doPullTask(final MessageQueue mq, final
PullTaskContext context) {
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index f9b8058..9bfd7c8 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
-import io.openmessaging.MessageListener;
import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer {
this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',',
';'));
- String consumerGroup = clientConfig.getRmqConsumerGroup();
+ String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary
for RocketMQ, please set it.");
}
@@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
- properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+ properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.registerMessageListener(new
MessageListenerImpl());
}
@Override
- public KeyValue properties() {
+ public KeyValue attributes() {
return properties;
}
@@ -91,6 +91,11 @@ public class PushConsumerImpl implements PushConsumer {
}
@Override
+ public void suspend(long timeout) {
+
+ }
+
+ @Override
public boolean isSuspended() {
return
this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
}
@@ -107,6 +112,32 @@ public class PushConsumerImpl implements PushConsumer {
}
@Override
+ public PushConsumer attachQueue(String queueName, MessageListener
listener, KeyValue attributes) {
+ return this.attachQueue(queueName, listener);
+ }
+
+ @Override
+ public PushConsumer detachQueue(String queueName) {
+ this.subscribeTable.remove(queueName);
+ try {
+ this.rocketmqPushConsumer.unsubscribe(queueName);
+ } catch (Exception e) {
+ throw new OMSRuntimeException("-1", String.format("RocketMQ push
consumer fails to unsubscribe topic: %s", queueName));
+ }
+ return null;
+ }
+
+ @Override
+ public void addInterceptor(ConsumerInterceptor interceptor) {
+
+ }
+
+ @Override
+ public void removeInterceptor(ConsumerInterceptor interceptor) {
+
+ }
+
+ @Override
public synchronized void startup() {
if (!started) {
try {
@@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
- ReceivedMessageContext context = new ReceivedMessageContext() {
+ MessageListener.Context context = new MessageListener.Context() {
@Override
- public KeyValue properties() {
+ public KeyValue attributes() {
return contextProperties;
}
@@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
-
- @Override
- public void ack(final KeyValue properties) {
- sync.countDown();
-
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
- }
};
long begin = System.currentTimeMillis();
- listener.onMessage(omsMsg, context);
+ listener.onReceived(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() *
60 * 1000;
try {
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index 43f80ae..702d561 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -23,13 +23,13 @@ import io.openmessaging.OMS;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage {
- private KeyValue headers;
- private KeyValue properties;
+ private KeyValue sysHeaders;
+ private KeyValue userHeaders;
private byte[] body;
public BytesMessageImpl() {
- this.headers = OMS.newKeyValue();
- this.properties = OMS.newKeyValue();
+ this.sysHeaders = OMS.newKeyValue();
+ this.userHeaders = OMS.newKeyValue();
}
@Override
@@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage {
}
@Override
- public KeyValue headers() {
- return headers;
+ public KeyValue sysHeaders() {
+ return sysHeaders;
}
@Override
- public KeyValue properties() {
- return properties;
+ public KeyValue userHeaders() {
+ return userHeaders;
}
@Override
- public Message putHeaders(final String key, final int value) {
- headers.put(key, value);
+ public Message putSysHeaders(String key, int value) {
+ sysHeaders.put(key, value);
return this;
}
@Override
- public Message putHeaders(final String key, final long value) {
- headers.put(key, value);
+ public Message putSysHeaders(String key, long value) {
+ sysHeaders.put(key, value);
return this;
}
@Override
- public Message putHeaders(final String key, final double value) {
- headers.put(key, value);
+ public Message putSysHeaders(String key, double value) {
+ sysHeaders.put(key, value);
return this;
}
@Override
- public Message putHeaders(final String key, final String value) {
- headers.put(key, value);
+ public Message putSysHeaders(String key, String value) {
+ sysHeaders.put(key, value);
return this;
}
@Override
- public Message putProperties(final String key, final int value) {
- properties.put(key, value);
+ public Message putUserHeaders(String key, int value) {
+ userHeaders.put(key, value);
return this;
}
@Override
- public Message putProperties(final String key, final long value) {
- properties.put(key, value);
+ public Message putUserHeaders(String key, long value) {
+ userHeaders.put(key, value);
return this;
}
@Override
- public Message putProperties(final String key, final double value) {
- properties.put(key, value);
+ public Message putUserHeaders(String key, double value) {
+ userHeaders.put(key, value);
return this;
}
@Override
- public Message putProperties(final String key, final String value) {
- properties.put(key, value);
+ public Message putUserHeaders(String key, String value) {
+ userHeaders.put(key, value);
return this;
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
new file mode 100644
index 0000000..4c6568a
--- /dev/null
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
@@ -0,0 +1,7 @@
+package io.openmessaging.rocketmq.domain;
+
+public interface RocketMQConstants {
+
+ String START_DELIVER_TIME = "__STARTDELIVERTIME";
+
+}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
index 228a9f0..85bcd68 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
-import io.openmessaging.SendResult;
+import io.openmessaging.producer.SendResult;
public class SendResultImpl implements SendResult {
private String messageId;
@@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult {
return messageId;
}
- @Override
public KeyValue properties() {
return properties;
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index db25fc6..f733756 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageFactory;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException;
@@ -53,7 +52,7 @@ abstract class AbstractOMSProducer implements
ServiceLifecycle, MessageFactory {
this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
@@ -61,10 +60,10 @@ abstract class AbstractOMSProducer implements
ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName();
-
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
+
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
- properties.put(PropertyKeys.PRODUCER_ID, producerId);
+ properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
}
@Override
@@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements
ServiceLifecycle, MessageFactory {
}
@Override
- public BytesMessage createBytesMessageToTopic(final String topic, final
byte[] body) {
- BytesMessage bytesMessage = new BytesMessageImpl();
- bytesMessage.setBody(body);
- bytesMessage.headers().put(MessageHeader.TOPIC, topic);
- return bytesMessage;
- }
-
- @Override
- public BytesMessage createBytesMessageToQueue(final String queue, final
byte[] body) {
- BytesMessage bytesMessage = new BytesMessageImpl();
- bytesMessage.setBody(body);
- bytesMessage.headers().put(MessageHeader.QUEUE, queue);
- return bytesMessage;
+ public BytesMessage createBytesMessage(String queue, byte[] body) {
+ BytesMessage message = new BytesMessageImpl();
+ message.setBody(body);
+ message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
+ return message;
}
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index 2c00c60..c2b6d3e 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.Producer;
import io.openmessaging.Promise;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.producer.BatchMessageSender;
+import io.openmessaging.producer.LocalTransactionExecutor;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.producer.SendCallback;
@@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
}
@Override
- public KeyValue properties() {
+ public KeyValue attributes() {
return properties;
}
@@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
@Override
public SendResult send(final Message message, final KeyValue properties) {
- long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
- ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) :
this.rocketmqProducer.getSendMsgTimeout();
+ long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
+ ? properties.getInt(Message.BuiltinKeys.TIMEOUT) :
this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
+ @Override
+ public SendResult send(Message message, LocalTransactionExecutor
branchExecutor, KeyValue attributes) {
+ return null;
+ }
+
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage =
msgConvert((BytesMessage) message);
@@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
log.error(String.format("Send message to RocketMQ failed, %s",
message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ
broker failed.");
}
- message.headers().put(MessageHeader.MESSAGE_ID,
rmqResult.getMsgId());
+ message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID,
rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s",
message), e);
- throw checkProducerException(rmqMessage.getTopic(),
message.headers().getString(MessageHeader.MESSAGE_ID), e);
+ throw checkProducerException(rmqMessage.getTopic(),
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
}
}
@@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue
properties) {
- long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
- ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) :
this.rocketmqProducer.getSendMsgTimeout();
+ long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
+ ? properties.getInt(Message.BuiltinKeys.TIMEOUT) :
this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
@@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final
org.apache.rocketmq.client.producer.SendResult rmqResult) {
- message.headers().put(MessageHeader.MESSAGE_ID,
rmqResult.getMsgId());
+ message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID,
rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
@@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer
implements Producer {
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
}
+
+ @Override
+ public BatchMessageSender createBatchMessageSender() {
+ return null;
+ }
+
+ @Override
+ public void addInterceptor(ProducerInterceptor interceptor) {
+
+ }
+
+ @Override
+ public void removeInterceptor(ProducerInterceptor interceptor) {
+
+ }
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
deleted file mode 100644
index 05225cc..0000000
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.SequenceProducer;
-import io.openmessaging.rocketmq.utils.OMSUtil;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.rocketmq.client.Validators;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.SendResult;
-
-public class SequenceProducerImpl extends AbstractOMSProducer implements
SequenceProducer {
-
- private BlockingQueue<Message> msgCacheQueue;
-
- public SequenceProducerImpl(final KeyValue properties) {
- super(properties);
- this.msgCacheQueue = new LinkedBlockingQueue<>();
- }
-
- @Override
- public KeyValue properties() {
- return properties;
- }
-
- @Override
- public void send(final Message message) {
- checkMessageType(message);
- org.apache.rocketmq.common.message.Message rmqMessage =
OMSUtil.msgConvert((BytesMessage) message);
- try {
- Validators.checkMessage(rmqMessage, this.rocketmqProducer);
- } catch (MQClientException e) {
- throw checkProducerException(rmqMessage.getTopic(),
message.headers().getString(MessageHeader.MESSAGE_ID), e);
- }
- msgCacheQueue.add(message);
- }
-
- @Override
- public void send(final Message message, final KeyValue properties) {
- send(message);
- }
-
- @Override
- public synchronized void commit() {
- List<Message> messages = new ArrayList<>();
- msgCacheQueue.drainTo(messages);
-
- List<org.apache.rocketmq.common.message.Message> rmqMessages = new
ArrayList<>();
-
- for (Message message : messages) {
- rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
- }
-
- if (rmqMessages.size() == 0) {
- return;
- }
-
- try {
- SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
- String[] msgIdArray = sendResult.getMsgId().split(",");
- for (int i = 0; i < messages.size(); i++) {
- Message message = messages.get(i);
- message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
- }
- } catch (Exception e) {
- throw checkProducerException("", "", e);
- }
- }
-
- @Override
- public synchronized void rollback() {
- msgCacheQueue.clear();
- }
-}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index 453b665..c1b5999 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
+import io.openmessaging.FutureListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -33,7 +33,7 @@ public class DefaultPromise<V> implements Promise<V> {
private long timeout;
private long createTime;
private Throwable exception = null;
- private List<PromiseListener<V>> promiseListenerList;
+ private List<FutureListener<V>> promiseListenerList;
public DefaultPromise() {
createTime = System.currentTimeMillis();
@@ -121,7 +121,7 @@ public class DefaultPromise<V> implements Promise<V> {
}
@Override
- public void addListener(final PromiseListener<V> listener) {
+ public void addListener(final FutureListener<V> listener) {
if (listener == null) {
throw new NullPointerException("FutureListener is null");
}
@@ -150,7 +150,7 @@ public class DefaultPromise<V> implements Promise<V> {
private void notifyListeners() {
if (promiseListenerList != null) {
- for (PromiseListener<V> listener : promiseListenerList) {
+ for (FutureListener<V> listener : promiseListenerList) {
notifyListener(listener);
}
}
@@ -199,12 +199,9 @@ public class DefaultPromise<V> implements Promise<V> {
return true;
}
- private void notifyListener(final PromiseListener<V> listener) {
+ private void notifyListener(final FutureListener<V> listener) {
try {
- if (exception != null)
- listener.operationFailed(this);
- else
- listener.operationCompleted(this);
+ listener.operationComplete(this);
} catch (Throwable t) {
LOG.error("notifyListener {} Error:{}",
listener.getClass().getSimpleName(), t);
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
index ba7cd59..ef9236f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -164,7 +164,7 @@ public final class BeanUtils {
final Set<String> keySet = properties.keySet();
for (String key : keySet) {
- String[] keyGroup = key.split("\\.");
+ String[] keyGroup = key.split("[\\._]");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
index 60c8408..2302141 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -18,11 +18,11 @@ package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
-import io.openmessaging.MessageHeader;
+import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
-import io.openmessaging.SendResult;
+import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
@@ -48,25 +48,26 @@ public class OMSUtil {
org.apache.rocketmq.common.message.Message rmqMessage = new
org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody());
- KeyValue headers = omsMessage.headers();
- KeyValue properties = omsMessage.properties();
+ KeyValue sysHeaders = omsMessage.sysHeaders();
+ KeyValue userHeaders = omsMessage.userHeaders();
//All destinations in RocketMQ use Topic
- if (headers.containsKey(MessageHeader.TOPIC)) {
- rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
- rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION,
"TOPIC");
- } else {
- rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
- rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION,
"QUEUE");
+ rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
+
+ if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
+ long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
+ if (deliverTime > 0) {
+
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME,
String.valueOf(deliverTime));
+ }
}
- for (String key : properties.keySet()) {
- MessageAccessor.putProperty(rmqMessage, key,
properties.getString(key));
+ for (String key : userHeaders.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key,
userHeaders.getString(key));
}
- //Headers has a high priority
- for (String key : headers.keySet()) {
- MessageAccessor.putProperty(rmqMessage, key,
headers.getString(key));
+ //System headers has a high priority
+ for (String key : sysHeaders.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key,
sysHeaders.getString(key));
}
return rmqMessage;
@@ -76,8 +77,8 @@ public class OMSUtil {
BytesMessage omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody());
- KeyValue headers = omsMsg.headers();
- KeyValue properties = omsMsg.properties();
+ KeyValue headers = omsMsg.sysHeaders();
+ KeyValue properties = omsMsg.userHeaders();
final Set<Map.Entry<String, String>> entries =
rmqMsg.getProperties().entrySet();
@@ -89,25 +90,22 @@ public class OMSUtil {
}
}
- omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
- if
(!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
-
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
- omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
- } else {
- omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
- }
- omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
- omsMsg.putHeaders(MessageHeader.BORN_HOST,
String.valueOf(rmqMsg.getBornHost()));
- omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP,
rmqMsg.getBornTimestamp());
- omsMsg.putHeaders(MessageHeader.STORE_HOST,
String.valueOf(rmqMsg.getStoreHost()));
- omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP,
rmqMsg.getStoreTimestamp());
+ omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
+
+ omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
+
+ omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
+ omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST,
String.valueOf(rmqMsg.getBornHost()));
+ omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP,
rmqMsg.getBornTimestamp());
+ omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST,
String.valueOf(rmqMsg.getStoreHost()));
+ omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP,
rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
- for (Field field : MessageHeader.class.getDeclaredFields()) {
+ for (Field field : BuiltinKeys.class.getDeclaredFields()) {
try {
- if (field.get(MessageHeader.class).equals(value)) {
+ if (field.get(BuiltinKeys.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 843ddb7..da2e8a0 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -18,12 +18,10 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
@@ -50,18 +48,18 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint = OMS
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
- consumer = messagingAccessPoint.createPullConsumer(queueName,
- OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ consumer =
messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID,
"TestGroup"));
+ consumer.attachQueue(queueName);
Field field =
PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
field.set(consumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig();
- clientConfig.setOmsOperationTimeout(200);
+ clientConfig.setOperationTimeout(200);
localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer,
clientConfig));
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
@@ -83,18 +81,18 @@ public class PullConsumerImplTest {
when(localMessageCache.poll()).thenReturn(consumedMsg);
- Message message = consumer.poll();
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ Message message = consumer.receive();
+
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see
ClientConfig#omsOperationTimeout.
- Message message = consumer.poll();
+ Message message = consumer.receive();
assertThat(message).isNull();
- message =
consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+ message =
consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 882e57e..b55816b 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -18,13 +18,11 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
@@ -49,10 +47,10 @@ public class PushConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint = OMS
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
- OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
Field field =
PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
@@ -75,8 +73,8 @@ public class PushConsumerImplTest {
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
- public void onMessage(final Message message, final
ReceivedMessageContext context) {
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ public void onReceived(Message message, Context context) {
+
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage)
message).getBody()).isEqualTo(testBody);
context.ack();
}
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index 1db80c3..fc6515e 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -17,9 +17,9 @@
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
+import io.openmessaging.OMS;
import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.producer.Producer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -49,8 +49,8 @@ public class ProducerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint = OMS
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
Field field =
AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
@@ -67,8 +67,8 @@ public class ProducerImplTest {
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenReturn(sendResult);
- io.openmessaging.SendResult omsResult =
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ io.openmessaging.producer.SendResult omsResult =
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
@@ -80,7 +80,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenReturn(sendResult);
try {
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ
broker failed.");
@@ -91,7 +91,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenThrow(MQClientException.class);
try {
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ
broker failed.");
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
deleted file mode 100644
index 823fe01..0000000
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.SequenceProducer;
-import java.lang.reflect.Field;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class SequenceProducerImplTest {
-
- private SequenceProducer producer;
-
- @Mock
- private DefaultMQProducer rocketmqProducer;
-
- @Before
- public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
- producer = messagingAccessPoint.createSequenceProducer();
-
- Field field =
AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
- field.setAccessible(true);
- field.set(producer, rocketmqProducer);
-
- messagingAccessPoint.startup();
- producer.startup();
- }
-
- @Test
- public void testSend_WithCommit() throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
- SendResult sendResult = new SendResult();
- sendResult.setMsgId("TestMsgID");
- sendResult.setSendStatus(SendStatus.SEND_OK);
-
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
- when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
- final BytesMessage message =
producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
- producer.send(message);
- producer.commit();
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
- }
-
- @Test
- public void testRollback() {
- when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
- final BytesMessage message =
producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
- producer.send(message);
- producer.rollback();
- producer.commit(); //Commit nothing.
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
- }
-}
\ No newline at end of file
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index 2240ff2..f226ede 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -16,8 +16,9 @@
*/
package io.openmessaging.rocketmq.promise;
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener() throws Exception {
- promise.addListener(new PromiseListener<String>() {
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationCompleted(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.get()).isEqualTo("Done");
- }
-
- @Override
- public void operationFailed(final Promise<String> promise) {
}
});
@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- assertThat(promise.get()).isEqualTo("Done");
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
-
+ public void operationComplete(Future<String> future) {
+ assertThat(future.get()).isEqualTo("Done");
}
});
}
@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
public void testAddListener_WithException_ListenerAfterSet() throws
Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test
Error");
promise.setFailure(exception);
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test
Error");
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
index 71ca11c..1a431d9 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -92,9 +92,9 @@ public class BeanUtilsTest {
@Test
public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig();
- config.setOmsConsumerId("NewConsumerId");
+ config.setConsumerId("NewConsumerId");
- Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+ Assert.assertEquals(config.getConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config);
diff --git a/pom.xml b/pom.xml
index 6737ae4..f4184a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -592,7 +592,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
- <version>0.1.0-alpha</version>
+ <version>0.3.0-alpha-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
--
To stop receiving notification emails like this one, please contact
[email protected].