This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 42d6e556f [ISSUE#4536] Add unit test for RocketMQSinkConnector. (#4538)
42d6e556f is described below
commit 42d6e556f6f126591cda7a057468f5e22add17a1
Author: yanrongzhen <[email protected]>
AuthorDate: Sat Nov 4 18:33:45 2023 +0800
[ISSUE#4536] Add unit test for RocketMQSinkConnector. (#4538)
* Add unit test.
* Refactor
---
.../eventmesh-connector-rocketmq/build.gradle | 3 +
.../sink/connector/RocketMQSinkConnectorTest.java | 93 ++++++++++++++++++++++
.../src/test/resources/sink-config.yml | 30 +++++++
3 files changed, 126 insertions(+)
diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
index ef6e907eb..769e9c6cf 100644
--- a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
@@ -33,7 +33,10 @@ List rocketmq = [
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation project(":eventmesh-common")
implementation rocketmq
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-junit-jupiter"
}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
new file mode 100644
index 000000000..afd13a3f2
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.rocketmq.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.rocketmq.sink.config.RocketMQSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class RocketMQSinkConnectorTest {
+
+ @InjectMocks
+ private RocketMQSinkConnector sinkConnector;
+
+ @Mock
+ private DefaultMQProducer producer;
+
+ private static final String EXPECTED_MESSAGE = "\"testMessage\"";
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ Mockito.doNothing().when(producer).start();
+ Mockito.doReturn(null).when(producer).send(Mockito.any(Message.class));
+ Field field = ReflectionSupport.findFields(sinkConnector.getClass(),
+ (f) -> f.getName().equals("producer"),
HierarchyTraversalMode.BOTTOM_UP).get(0);
+ field.setAccessible(true);
+ field.set(sinkConnector, producer);
+ producer.start();
+ RocketMQSinkConfig sinkConfig = (RocketMQSinkConfig)
ConfigUtil.parse(sinkConnector.configClass());
+ sinkConnector.init(sinkConfig);
+ sinkConnector.start();
+ }
+
+ @Test
+ public void testRocketMQSinkConnector() throws Exception {
+ final int messageCount = 5;
+ sinkConnector.put(generateMockedRecords(messageCount));
+ verify(producer, times(messageCount)).send(any(Message.class));
+ }
+
+ private List<ConnectRecord> generateMockedRecords(final int messageCount) {
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < messageCount; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
+ EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8));
+ connectRecord.addExtension("id",
String.valueOf(UUID.randomUUID()));
+ records.add(connectRecord);
+ }
+ return records;
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..210361dc2
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: rocketmqSink
+ appId: 5031
+ userName: rocketmqSinkUser
+ passWord: rocketmqPassWord
+connectorConfig:
+ connectorName: rocketmqSink
+ nameServer: 127.0.0.1:9876
+ topic: TopicTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]