This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop_oms_0.3.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop_oms_0.3.0 by this push:
new 4fb72cd Fix tests
4fb72cd is described below
commit 4fb72cd151d7a3607b5aef2b14c3b2de36e15a2a
Author: shutian.lzh <[email protected]>
AuthorDate: Tue Apr 17 10:50:18 2018 +0800
Fix tests
---
.../rocketmq/client/log/ClientLoggerTest.java | 5 +-
.../example/openmessaging/SimpleProducer.java | 1 +
.../rocketmq/config/ClientConfig.java | 124 ++++++++++-----------
.../rocketmq/consumer/LocalMessageCache.java | 4 +-
.../rocketmq/consumer/PullConsumerImpl.java | 4 +-
.../rocketmq/consumer/PushConsumerImpl.java | 4 +-
.../rocketmq/producer/AbstractOMSProducer.java | 4 +-
.../io/openmessaging/rocketmq/utils/BeanUtils.java | 2 +-
.../rocketmq/consumer/PullConsumerImplTest.java | 7 +-
.../rocketmq/consumer/PushConsumerImplTest.java | 5 +-
.../rocketmq/producer/ProducerImplTest.java | 2 +-
.../rocketmq/utils/BeanUtilsTest.java | 4 +-
12 files changed, 86 insertions(+), 80 deletions(-)
diff --git
a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
index 9fe0d8b..4888186 100644
--- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
@@ -49,7 +49,10 @@ public class ClientLoggerTest {
rocketmqCommon.info("common message {}", i, new
RuntimeException());
rocketmqRemoting.info("remoting message {}", i, new
RuntimeException());
}
-
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
Assert.assertTrue(content.contains("testClientlog"));
Assert.assertTrue(content.contains("RocketmqClient"));
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index f993225..dbe1d10 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -69,6 +69,7 @@ public class SimpleProducer {
try {
countDownLatch.await();
+ Thread.sleep(500); // Wait some time for one-way delivery.
} catch (InterruptedException ignore) {
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
index 774a7bc..a5dfe49 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
@@ -20,16 +20,16 @@ import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
- private String omsDriverImpl;
- private String omsAccessPoints;
- private String omsNamespace;
- private String omsProducerId;
- private String omsConsumerId;
- private int omsOperationTimeout = 5000;
- private String omsRoutingName;
- private String omsOperatorName;
- private String omsDstQueue;
- private String omsSrcTopic;
+ private String driverImpl;
+ private String accessPoints;
+ private String namespace;
+ private String producerId;
+ private String consumerId;
+ private int operationTimeout = 5000;
+ private String region;
+ private String routingSource;
+ private String routingDestination;
+ private String routingExpression;
private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16;
@@ -40,84 +40,60 @@ public class ClientConfig implements OMSBuiltinKeys,
NonStandardKeys {
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
- public String getOmsDriverImpl() {
- return omsDriverImpl;
+ public String getDriverImpl() {
+ return driverImpl;
}
- public void setOmsDriverImpl(final String omsDriverImpl) {
- this.omsDriverImpl = omsDriverImpl;
+ public void setDriverImpl(final String driverImpl) {
+ this.driverImpl = driverImpl;
}
- public String getOmsAccessPoints() {
- return omsAccessPoints;
+ public String getAccessPoints() {
+ return accessPoints;
}
- public void setOmsAccessPoints(final String omsAccessPoints) {
- this.omsAccessPoints = omsAccessPoints;
+ public void setAccessPoints(final String accessPoints) {
+ this.accessPoints = accessPoints;
}
- public String getOmsNamespace() {
- return omsNamespace;
+ public String getNamespace() {
+ return namespace;
}
- public void setOmsNamespace(final String omsNamespace) {
- this.omsNamespace = omsNamespace;
+ public void setNamespace(final String namespace) {
+ this.namespace = namespace;
}
- public String getOmsProducerId() {
- return omsProducerId;
+ public String getProducerId() {
+ return producerId;
}
- public void setOmsProducerId(final String omsProducerId) {
- this.omsProducerId = omsProducerId;
+ public void setProducerId(final String producerId) {
+ this.producerId = producerId;
}
- public String getOmsConsumerId() {
- return omsConsumerId;
+ public String getConsumerId() {
+ return consumerId;
}
- public void setOmsConsumerId(final String omsConsumerId) {
- this.omsConsumerId = omsConsumerId;
+ public void setConsumerId(final String consumerId) {
+ this.consumerId = consumerId;
}
- public int getOmsOperationTimeout() {
- return omsOperationTimeout;
+ public int getOperationTimeout() {
+ return operationTimeout;
}
- public void setOmsOperationTimeout(final int omsOperationTimeout) {
- this.omsOperationTimeout = omsOperationTimeout;
+ public void setOperationTimeout(final int operationTimeout) {
+ this.operationTimeout = operationTimeout;
}
- public String getOmsRoutingName() {
- return omsRoutingName;
+ public String getRoutingSource() {
+ return routingSource;
}
- public void setOmsRoutingName(final String omsRoutingName) {
- this.omsRoutingName = omsRoutingName;
- }
-
- public String getOmsOperatorName() {
- return omsOperatorName;
- }
-
- public void setOmsOperatorName(final String omsOperatorName) {
- this.omsOperatorName = omsOperatorName;
- }
-
- public String getOmsDstQueue() {
- return omsDstQueue;
- }
-
- public void setOmsDstQueue(final String omsDstQueue) {
- this.omsDstQueue = omsDstQueue;
- }
-
- public String getOmsSrcTopic() {
- return omsSrcTopic;
- }
-
- public void setOmsSrcTopic(final String omsSrcTopic) {
- this.omsSrcTopic = omsSrcTopic;
+ public void setRoutingSource(final String routingSource) {
+ this.routingSource = routingSource;
}
public String getRmqConsumerGroup() {
@@ -191,4 +167,28 @@ public class ClientConfig implements OMSBuiltinKeys,
NonStandardKeys {
public void setRmqPullMessageCacheCapacity(final int
rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
}
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getRoutingDestination() {
+ return routingDestination;
+ }
+
+ public void setRoutingDestination(String routingDestination) {
+ this.routingDestination = routingDestination;
+ }
+
+ public String getRoutingExpression() {
+ return routingExpression;
+ }
+
+ public void setRoutingExpression(String routingExpression) {
+ this.routingExpression = routingExpression;
+ }
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index 872d8fb..93e60a7 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -91,11 +91,11 @@ class LocalMessageCache implements ServiceLifecycle {
}
MessageExt poll() {
- return poll(clientConfig.getOmsOperationTimeout());
+ return poll(clientConfig.getOperationTimeout());
}
MessageExt poll(final KeyValue properties) {
- int currentPollTimeout = clientConfig.getOmsOperationTimeout();
+ int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout =
properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 8bc7a77..225b09e 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer {
this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String consumerGroup = clientConfig.getOmsConsumerId();
+ String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary
for RocketMQ, please set it.");
}
@@ -60,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer =
pullConsumerScheduleService.getDefaultMQPullConsumer();
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 8910676..9bfd7c8 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer {
this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',',
';'));
- String consumerGroup = clientConfig.getOmsConsumerId();
+ String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary
for RocketMQ, please set it.");
}
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 2e99fd6..f733756 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -52,7 +52,7 @@ abstract class AbstractOMSProducer implements
ServiceLifecycle, MessageFactory {
this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
- String accessPoints = clientConfig.getOmsAccessPoints();
+ String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or
empty.");
}
@@ -60,7 +60,7 @@ abstract class AbstractOMSProducer implements
ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName();
-
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
+
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
index 054374b..ef9236f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -171,7 +171,7 @@ public final class BeanUtils {
}
String beanFieldNameWithCapitalization =
StringUtils.join(keyGroup);
try {
- setProperties(clazz, obj, "setOms" +
beanFieldNameWithCapitalization, properties.getString(key));
+ setProperties(clazz, obj, "set" +
beanFieldNameWithCapitalization, properties.getString(key));
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException ignored) {
//ignored...
}
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 7e81b40..da2e8a0 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
+import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
@@ -48,9 +49,9 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
- consumer =
messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ consumer =
messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID,
"TestGroup"));
consumer.attachQueue(queueName);
Field field =
PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
@@ -58,7 +59,7 @@ public class PullConsumerImplTest {
field.set(consumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig();
- clientConfig.setOmsOperationTimeout(200);
+ clientConfig.setOperationTimeout(200);
localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer,
clientConfig));
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 5caa2b6..b55816b 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -18,6 +18,7 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
+import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
@@ -47,9 +48,9 @@ public class PushConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
- OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
Field field =
PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index 7b36179..fc6515e 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -50,7 +50,7 @@ public class ProducerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
Field field =
AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
index 71ca11c..1a431d9 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -92,9 +92,9 @@ public class BeanUtilsTest {
@Test
public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig();
- config.setOmsConsumerId("NewConsumerId");
+ config.setConsumerId("NewConsumerId");
- Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+ Assert.assertEquals(config.getConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config);
--
To stop receiving notification emails like this one, please contact
[email protected].