This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 42d6e556f [ISSUE#4536] Add unit test for RocketMQSinkConnector. (#4538)
42d6e556f is described below

commit 42d6e556f6f126591cda7a057468f5e22add17a1
Author: yanrongzhen <[email protected]>
AuthorDate: Sat Nov 4 18:33:45 2023 +0800

    [ISSUE#4536] Add unit test for RocketMQSinkConnector. (#4538)
    
    * Add unit test.
    
    * Refactor
---
 .../eventmesh-connector-rocketmq/build.gradle      |  3 +
 .../sink/connector/RocketMQSinkConnectorTest.java  | 93 ++++++++++++++++++++++
 .../src/test/resources/sink-config.yml             | 30 +++++++
 3 files changed, 126 insertions(+)

diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle 
b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
index ef6e907eb..769e9c6cf 100644
--- a/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-rocketmq/build.gradle
@@ -33,7 +33,10 @@ List rocketmq = [
 
 dependencies {
     api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    implementation project(":eventmesh-common")
     implementation rocketmq
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.mockito:mockito-junit-jupiter"
 }
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
new file mode 100644
index 000000000..afd13a3f2
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnectorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.rocketmq.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.rocketmq.sink.config.RocketMQSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class RocketMQSinkConnectorTest {
+
+    @InjectMocks
+    private RocketMQSinkConnector sinkConnector;
+
+    @Mock
+    private DefaultMQProducer producer;
+
+    private static final String EXPECTED_MESSAGE = "\"testMessage\"";
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        Mockito.doNothing().when(producer).start();
+        Mockito.doReturn(null).when(producer).send(Mockito.any(Message.class));
+        Field field = ReflectionSupport.findFields(sinkConnector.getClass(),
+            (f) -> f.getName().equals("producer"), 
HierarchyTraversalMode.BOTTOM_UP).get(0);
+        field.setAccessible(true);
+        field.set(sinkConnector, producer);
+        producer.start();
+        RocketMQSinkConfig sinkConfig = (RocketMQSinkConfig) 
ConfigUtil.parse(sinkConnector.configClass());
+        sinkConnector.init(sinkConfig);
+        sinkConnector.start();
+    }
+
+    @Test
+    public void testRocketMQSinkConnector() throws Exception {
+        final int messageCount = 5;
+        sinkConnector.put(generateMockedRecords(messageCount));
+        verify(producer, times(messageCount)).send(any(Message.class));
+    }
+
+    private List<ConnectRecord> generateMockedRecords(final int messageCount) {
+        List<ConnectRecord> records = new ArrayList<>();
+        for (int i = 0; i < messageCount; i++) {
+            RecordPartition partition = new RecordPartition();
+            RecordOffset offset = new RecordOffset();
+            ConnectRecord connectRecord = new ConnectRecord(partition, offset, 
System.currentTimeMillis(),
+                EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8));
+            connectRecord.addExtension("id", 
String.valueOf(UUID.randomUUID()));
+            records.add(connectRecord);
+        }
+        return records;
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..210361dc2
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: rocketmqSink
+    appId: 5031
+    userName: rocketmqSinkUser
+    passWord: rocketmqPassWord
+connectorConfig:
+    connectorName: rocketmqSink
+    nameServer: 127.0.0.1:9876
+    topic: TopicTest


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to