This is an automated email from the ASF dual-hosted git repository.
mxsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 261223f36 [ISSUE#4531] Add unit test for RedisSinkConnector. (#4533)
261223f36 is described below
commit 261223f36ade3278537812a3a637ad5d0df58d4c
Author: yanrongzhen <[email protected]>
AuthorDate: Sun Nov 5 00:16:37 2023 +0800
[ISSUE#4531] Add unit test for RedisSinkConnector. (#4533)
* Add unit test for RedisSinkConnector.
* Refactor test case.
* Do some refactor.
* shut down redis server.
* Simplify test cases.
---
.../eventmesh-connector-redis/build.gradle | 3 +
.../connector/redis/AbstractRedisServer.java} | 27 ++++--
.../sink/connector/RedisSinkConnectorTest.java | 101 +++++++++++++++++++++
.../src/test/resources/sink-config.yml | 31 +++++++
4 files changed, 155 insertions(+), 7 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
index 42d3d4ced..425a10570 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
@@ -24,4 +24,7 @@ dependencies {
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation 'ai.grakn:redis-mock:0.1.6'
+ testImplementation project(":eventmesh-common")
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
similarity index 58%
copy from eventmesh-connectors/eventmesh-connector-redis/build.gradle
copy to
eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
index 42d3d4ced..46272496f 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
@@ -15,13 +15,26 @@
* limitations under the License.
*/
-dependencies {
- implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+package org.apache.eventmesh.connector.redis;
- implementation 'org.redisson:redisson:3.17.3'
+import ai.grakn.redismock.RedisServer;
- api 'io.cloudevents:cloudevents-json-jackson'
+public abstract class AbstractRedisServer {
- compileOnly 'org.projectlombok:lombok'
- annotationProcessor 'org.projectlombok:lombok'
-}
\ No newline at end of file
+ protected RedisServer redisServer;
+
+ public void setupRedisServer(int port) throws Exception {
+ redisServer = RedisServer.newRedisServer(port);
+ redisServer.start();
+ }
+
+ public void shutdownRedisServer() {
+ if (redisServer != null) {
+ redisServer.stop();
+ }
+ }
+
+ public static int getPortFromUrl(String url) {
+ return Integer.parseInt(url.substring(url.lastIndexOf(":") + 1));
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
new file mode 100644
index 000000000..13ec4f737
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.redis.sink.connector;
+
+import org.apache.eventmesh.connector.redis.AbstractRedisServer;
+import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
+import org.apache.eventmesh.connector.redis.sink.config.RedisSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.redisson.Redisson;
+import org.redisson.api.RTopic;
+import org.redisson.config.Config;
+
+import io.cloudevents.CloudEvent;
+
+public class RedisSinkConnectorTest extends AbstractRedisServer {
+
+ private RedisSinkConnector connector;
+
+ private Redisson redisson;
+
+ private RedisSinkConfig sinkConfig;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ connector = new RedisSinkConnector();
+ sinkConfig = (RedisSinkConfig)
ConfigUtil.parse(connector.configClass());
+
setupRedisServer(getPortFromUrl(sinkConfig.getConnectorConfig().getServer()));
+ connector.init(sinkConfig);
+ connector.start();
+ Config config = new Config();
+ config.setCodec(CloudEventCodec.getInstance());
+ config.useSingleServer()
+ .setAddress(sinkConfig.getConnectorConfig().getServer());
+ redisson = (Redisson) Redisson.create(config);
+ }
+
+ @Test
+ public void testPutConnectRecords() throws InterruptedException {
+ RTopic topic =
redisson.getTopic(sinkConfig.connectorConfig.getTopic());
+
+ final String expectedMessage = "\"testRedisMessage\"";
+ final int expectedCount = 5;
+ final CountDownLatch downLatch = new CountDownLatch(expectedCount);
+ topic.addListener(CloudEvent.class, (channel, msg) -> {
+ downLatch.countDown();
+ Assertions.assertNotNull(msg.getData());
+ Assertions.assertEquals(expectedMessage, new
String(msg.getData().toBytes()));
+ });
+
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < expectedCount; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
+ expectedMessage.getBytes(StandardCharsets.UTF_8));
+ connectRecord.addExtension("id",
String.valueOf(UUID.randomUUID()));
+ connectRecord.addExtension("source", "testSource");
+ connectRecord.addExtension("type", "testType");
+ records.add(connectRecord);
+ }
+ connector.put(records);
+ Assertions.assertTrue(downLatch.await(10, TimeUnit.SECONDS));
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ connector.stop();
+ redisson.shutdown();
+ shutdownRedisServer();
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..596006a2a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: redisSink
+ appId: 5031
+ userName: redisSinkUser
+ passWord: redisPassWord
+connectorConfig:
+ connectorName: redisSink
+ server: redis://127.0.0.1:6379
+ topic: SinkTopic
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]