This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop_oms_0.3.0
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop_oms_0.3.0 by this push:
new 2988a1e Make unit tests compilable
2988a1e is described below
commit 2988a1e6caa8d21413ef6ff60cc04a8cfb3dae20
Author: shutian.lzh <[email protected]>
AuthorDate: Sun Apr 15 17:00:13 2018 +0800
Make unit tests compilable
---
.../example/openmessaging/SimpleProducer.java | 4 +-
.../rocketmq/consumer/PullConsumerImplTest.java | 19 ++---
.../rocketmq/consumer/PushConsumerImplTest.java | 13 ++--
.../rocketmq/producer/ProducerImplTest.java | 14 ++--
.../producer/SequenceProducerImplTest.java | 86 ----------------------
.../rocketmq/promise/DefaultPromiseTest.java | 38 +++-------
6 files changed, 33 insertions(+), 141 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 2884797..c785504 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -27,8 +27,8 @@ import java.nio.charset.Charset;
public class SimpleProducer {
public static void main(String[] args) {
- final MessagingAccessPoint messagingAccessPoint = OMS
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+ final MessagingAccessPoint messagingAccessPoint =
+
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 843ddb7..7e81b40 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -18,12 +18,9 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
@@ -50,11 +47,11 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+ final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
- consumer = messagingAccessPoint.createPullConsumer(queueName,
- OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ consumer =
messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
+ consumer.attachQueue(queueName);
Field field =
PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
@@ -83,18 +80,18 @@ public class PullConsumerImplTest {
when(localMessageCache.poll()).thenReturn(consumedMsg);
- Message message = consumer.poll();
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ Message message = consumer.receive();
+
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see
ClientConfig#omsOperationTimeout.
- Message message = consumer.poll();
+ Message message = consumer.receive();
assertThat(message).isNull();
- message =
consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+ message =
consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 882e57e..5caa2b6 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -18,13 +18,10 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
+import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
@@ -49,7 +46,7 @@ public class PushConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+ final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP,
"TestGroup"));
@@ -75,8 +72,8 @@ public class PushConsumerImplTest {
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
- public void onMessage(final Message message, final
ReceivedMessageContext context) {
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+ public void onReceived(Message message, Context context) {
+
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage)
message).getBody()).isEqualTo(testBody);
context.ack();
}
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index 1db80c3..7b36179 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -17,9 +17,9 @@
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
+import io.openmessaging.OMS;
import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.producer.Producer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -49,7 +49,7 @@ public class ProducerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
+ final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
@@ -67,8 +67,8 @@ public class ProducerImplTest {
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenReturn(sendResult);
- io.openmessaging.SendResult omsResult =
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ io.openmessaging.producer.SendResult omsResult =
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
@@ -80,7 +80,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenReturn(sendResult);
try {
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ
broker failed.");
@@ -91,7 +91,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class),
anyLong())).thenThrow(MQClientException.class);
try {
- producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC",
new byte[] {'a'}));
+ producer.send(producer.createBytesMessage("HELLO_TOPIC", new
byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ
broker failed.");
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
deleted file mode 100644
index 823fe01..0000000
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.SequenceProducer;
-import java.lang.reflect.Field;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class SequenceProducerImplTest {
-
- private SequenceProducer producer;
-
- @Mock
- private DefaultMQProducer rocketmqProducer;
-
- @Before
- public void init() throws NoSuchFieldException, IllegalAccessException {
- final MessagingAccessPoint messagingAccessPoint =
MessagingAccessPointFactory
-
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
- producer = messagingAccessPoint.createSequenceProducer();
-
- Field field =
AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
- field.setAccessible(true);
- field.set(producer, rocketmqProducer);
-
- messagingAccessPoint.startup();
- producer.startup();
- }
-
- @Test
- public void testSend_WithCommit() throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
- SendResult sendResult = new SendResult();
- sendResult.setMsgId("TestMsgID");
- sendResult.setSendStatus(SendStatus.SEND_OK);
-
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
- when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
- final BytesMessage message =
producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
- producer.send(message);
- producer.commit();
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
- }
-
- @Test
- public void testRollback() {
- when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
- final BytesMessage message =
producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
- producer.send(message);
- producer.rollback();
- producer.commit(); //Commit nothing.
-
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
- }
-}
\ No newline at end of file
diff --git
a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index 2240ff2..f226ede 100644
---
a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++
b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -16,8 +16,9 @@
*/
package io.openmessaging.rocketmq.promise;
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener() throws Exception {
- promise.addListener(new PromiseListener<String>() {
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationCompleted(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.get()).isEqualTo("Done");
- }
-
- @Override
- public void operationFailed(final Promise<String> promise) {
}
});
@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- assertThat(promise.get()).isEqualTo("Done");
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
-
+ public void operationComplete(Future<String> future) {
+ assertThat(future.get()).isEqualTo("Done");
}
});
}
@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
public void testAddListener_WithException_ListenerAfterSet() throws
Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test
Error");
promise.setFailure(exception);
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test
Error");
- promise.addListener(new PromiseListener<String>() {
- @Override
- public void operationCompleted(final Promise<String> promise) {
- }
-
+ promise.addListener(new FutureListener<String>() {
@Override
- public void operationFailed(final Promise<String> promise) {
+ public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
--
To stop receiving notification emails like this one, please contact
[email protected].