This is an automated email from the ASF dual-hosted git repository.

mxsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 261223f36 [ISSUE#4531] Add unit test for RedisSinkConnector. (#4533)
261223f36 is described below

commit 261223f36ade3278537812a3a637ad5d0df58d4c
Author: yanrongzhen <[email protected]>
AuthorDate: Sun Nov 5 00:16:37 2023 +0800

    [ISSUE#4531] Add unit test for RedisSinkConnector. (#4533)
    
    * Add unit test for RedisSinkConnector.
    
    * Refactor test case.
    
    * Do some refactor.
    
    * shut down redis server.
    
    * Simplify test cases.
---
 .../eventmesh-connector-redis/build.gradle         |   3 +
 .../connector/redis/AbstractRedisServer.java}      |  27 ++++--
 .../sink/connector/RedisSinkConnectorTest.java     | 101 +++++++++++++++++++++
 .../src/test/resources/sink-config.yml             |  31 +++++++
 4 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle 
b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
index 42d3d4ced..425a10570 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-redis/build.gradle
@@ -24,4 +24,7 @@ dependencies {
 
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation 'ai.grakn:redis-mock:0.1.6'
+    testImplementation project(":eventmesh-common")
 }
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
similarity index 58%
copy from eventmesh-connectors/eventmesh-connector-redis/build.gradle
copy to 
eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
index 42d3d4ced..46272496f 100644
--- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java
@@ -15,13 +15,26 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+package org.apache.eventmesh.connector.redis;
 
-    implementation 'org.redisson:redisson:3.17.3'
+import ai.grakn.redismock.RedisServer;
 
-    api 'io.cloudevents:cloudevents-json-jackson'
+public abstract class AbstractRedisServer {
 
-    compileOnly 'org.projectlombok:lombok'
-    annotationProcessor 'org.projectlombok:lombok'
-}
\ No newline at end of file
+    protected RedisServer redisServer;
+
+    public void setupRedisServer(int port) throws Exception {
+        redisServer = RedisServer.newRedisServer(port);
+        redisServer.start();
+    }
+
+    public void shutdownRedisServer() {
+        if (redisServer != null) {
+            redisServer.stop();
+        }
+    }
+
+    public static int getPortFromUrl(String url) {
+        return Integer.parseInt(url.substring(url.lastIndexOf(":") + 1));
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
new file mode 100644
index 000000000..13ec4f737
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.redis.sink.connector;
+
+import org.apache.eventmesh.connector.redis.AbstractRedisServer;
+import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
+import org.apache.eventmesh.connector.redis.sink.config.RedisSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.redisson.Redisson;
+import org.redisson.api.RTopic;
+import org.redisson.config.Config;
+
+import io.cloudevents.CloudEvent;
+
+public class RedisSinkConnectorTest extends AbstractRedisServer {
+
+    private RedisSinkConnector connector;
+
+    private Redisson redisson;
+
+    private RedisSinkConfig sinkConfig;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        connector = new RedisSinkConnector();
+        sinkConfig = (RedisSinkConfig) 
ConfigUtil.parse(connector.configClass());
+        
setupRedisServer(getPortFromUrl(sinkConfig.getConnectorConfig().getServer()));
+        connector.init(sinkConfig);
+        connector.start();
+        Config config = new Config();
+        config.setCodec(CloudEventCodec.getInstance());
+        config.useSingleServer()
+            .setAddress(sinkConfig.getConnectorConfig().getServer());
+        redisson = (Redisson) Redisson.create(config);
+    }
+
+    @Test
+    public void testPutConnectRecords() throws InterruptedException {
+        RTopic topic = 
redisson.getTopic(sinkConfig.connectorConfig.getTopic());
+
+        final String expectedMessage = "\"testRedisMessage\"";
+        final int expectedCount = 5;
+        final CountDownLatch downLatch = new CountDownLatch(expectedCount);
+        topic.addListener(CloudEvent.class, (channel, msg) -> {
+            downLatch.countDown();
+            Assertions.assertNotNull(msg.getData());
+            Assertions.assertEquals(expectedMessage, new 
String(msg.getData().toBytes()));
+        });
+
+        List<ConnectRecord> records = new ArrayList<>();
+        for (int i = 0; i < expectedCount; i++) {
+            RecordPartition partition = new RecordPartition();
+            RecordOffset offset = new RecordOffset();
+            ConnectRecord connectRecord = new ConnectRecord(partition, offset, 
System.currentTimeMillis(),
+                expectedMessage.getBytes(StandardCharsets.UTF_8));
+            connectRecord.addExtension("id", 
String.valueOf(UUID.randomUUID()));
+            connectRecord.addExtension("source", "testSource");
+            connectRecord.addExtension("type", "testType");
+            records.add(connectRecord);
+        }
+        connector.put(records);
+        Assertions.assertTrue(downLatch.await(10, TimeUnit.SECONDS));
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        connector.stop();
+        redisson.shutdown();
+        shutdownRedisServer();
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..596006a2a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: redisSink
+    appId: 5031
+    userName: redisSinkUser
+    passWord: redisPassWord
+connectorConfig:
+    connectorName: redisSink
+    server: redis://127.0.0.1:6379
+    topic: SinkTopic
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to