This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 02a35c3979 [Feature] [Connector-Redis] Redis connector support delete 
data (#7994)
02a35c3979 is described below

commit 02a35c3979d905fe612c97f003876122b410a671
Author: limin <[email protected]>
AuthorDate: Wed Nov 13 18:45:50 2024 +0800

    [Feature] [Connector-Redis] Redis connector support delete data (#7994)
    
    Co-authored-by: limin <[email protected]>
---
 .../seatunnel/redis/client/RedisClient.java        | 23 ++++--
 .../seatunnel/redis/client/RedisClusterClient.java | 46 +++++++++---
 .../seatunnel/redis/client/RedisSingleClient.java  | 73 +++++++++++++-----
 .../seatunnel/redis/config/RedisDataType.java      | 35 +++++++++
 .../seatunnel/redis/sink/RedisSinkWriter.java      | 20 +++--
 .../connector/redis/RedisTestCaseTemplateIT.java   | 59 +++++++++++++++
 .../resources/fake-to-redis-test-delete-hash.conf  | 87 ++++++++++++++++++++++
 .../resources/fake-to-redis-test-delete-key.conf   | 87 ++++++++++++++++++++++
 .../resources/fake-to-redis-test-delete-list.conf  | 86 +++++++++++++++++++++
 .../resources/fake-to-redis-test-delete-set.conf   | 86 +++++++++++++++++++++
 .../resources/fake-to-redis-test-delete-zset.conf  | 86 +++++++++++++++++++++
 11 files changed, 648 insertions(+), 40 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index 109c51f78a..af7894795d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.client;
 
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
@@ -92,17 +93,29 @@ public abstract class RedisClient extends Jedis {
     public abstract List<List<String>> batchGetZset(List<String> keys);
 
     public abstract void batchWriteString(
-            List<String> keys, List<String> values, long expireSeconds);
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds);
 
     public abstract void batchWriteList(
-            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+            List<RowKind> rowKinds,
+            List<String> keyBuffer,
+            List<String> valueBuffer,
+            long expireSeconds);
 
     public abstract void batchWriteSet(
-            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+            List<RowKind> rowKinds,
+            List<String> keyBuffer,
+            List<String> valueBuffer,
+            long expireSeconds);
 
     public abstract void batchWriteHash(
-            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+            List<RowKind> rowKinds,
+            List<String> keyBuffer,
+            List<String> valueBuffer,
+            long expireSeconds);
 
     public abstract void batchWriteZset(
-            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+            List<RowKind> rowKinds,
+            List<String> keyBuffer,
+            List<String> valueBuffer,
+            long expireSeconds);
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index bd687e6c9b..485499476c 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.client;
 
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
@@ -97,42 +98,67 @@ public class RedisClusterClient extends RedisClient {
     }
 
     @Override
-    public void batchWriteString(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteString(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
-            RedisDataType.STRING.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
+                RedisDataType.STRING.del(this, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.STRING.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            }
         }
     }
 
     @Override
-    public void batchWriteList(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteList(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
-            RedisDataType.LIST.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
+                RedisDataType.LIST.del(this, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.LIST.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            }
         }
     }
 
     @Override
-    public void batchWriteSet(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteSet(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
-            RedisDataType.SET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
+                RedisDataType.SET.del(this, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.SET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            }
         }
     }
 
     @Override
-    public void batchWriteHash(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteHash(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
-            RedisDataType.HASH.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
+                RedisDataType.HASH.del(this, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.HASH.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            }
         }
     }
 
     @Override
-    public void batchWriteZset(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteZset(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
-            RedisDataType.ZSET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
+                RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.ZSET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+            }
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index f79aa46e98..c9d3ba6788 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.client;
 
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
@@ -139,76 +140,108 @@ public class RedisSingleClient extends RedisClient {
     }
 
     @Override
-    public void batchWriteString(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteString(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
+            RowKind rowKind = rowKinds.get(i);
             String key = keys.get(i);
             String value = values.get(i);
-            pipelined.set(key, value);
-            if (expireSeconds > 0) {
-                pipelined.expire(key, expireSeconds);
+            if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
+                pipelined.del(key);
+            } else {
+                pipelined.set(key, value);
+                if (expireSeconds > 0) {
+                    pipelined.expire(key, expireSeconds);
+                }
             }
         }
         pipelined.sync();
     }
 
     @Override
-    public void batchWriteList(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteList(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
+            RowKind rowKind = rowKinds.get(i);
             String key = keys.get(i);
             String value = values.get(i);
-            pipelined.lpush(key, value);
-            if (expireSeconds > 0) {
-                pipelined.expire(key, expireSeconds);
+            if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
+                pipelined.lrem(key, 1, value);
+            } else {
+                pipelined.lpush(key, value);
+                if (expireSeconds > 0) {
+                    pipelined.expire(key, expireSeconds);
+                }
             }
         }
         pipelined.sync();
     }
 
     @Override
-    public void batchWriteSet(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteSet(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
+            RowKind rowKind = rowKinds.get(i);
             String key = keys.get(i);
             String value = values.get(i);
-            pipelined.sadd(key, value);
-            if (expireSeconds > 0) {
-                pipelined.expire(key, expireSeconds);
+            if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
+                pipelined.srem(key, value);
+            } else {
+                pipelined.sadd(key, value);
+                if (expireSeconds > 0) {
+                    pipelined.expire(key, expireSeconds);
+                }
             }
         }
         pipelined.sync();
     }
 
     @Override
-    public void batchWriteHash(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteHash(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
+            RowKind rowKind = rowKinds.get(i);
             String key = keys.get(i);
             String value = values.get(i);
             Map<String, String> fieldsMap = JsonUtils.toMap(value);
-            pipelined.hset(key, fieldsMap);
-            if (expireSeconds > 0) {
-                pipelined.expire(key, expireSeconds);
+            if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
+                for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
+                    pipelined.hdel(key, entry.getKey());
+                }
+            } else {
+                pipelined.hset(key, fieldsMap);
+                if (expireSeconds > 0) {
+                    pipelined.expire(key, expireSeconds);
+                }
             }
         }
         pipelined.sync();
     }
 
     @Override
-    public void batchWriteZset(List<String> keys, List<String> values, long 
expireSeconds) {
+    public void batchWriteZset(
+            List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
+            RowKind rowKind = rowKinds.get(i);
             String key = keys.get(i);
             String value = values.get(i);
-            pipelined.zadd(key, 1, value);
-            if (expireSeconds > 0) {
-                pipelined.expire(key, expireSeconds);
+            if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
+                pipelined.zrem(key, value);
+            } else {
+                pipelined.zadd(key, 1, value);
+                if (expireSeconds > 0) {
+                    pipelined.expire(key, expireSeconds);
+                }
             }
         }
         pipelined.sync();
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index aac874254d..7929e8181d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -39,6 +39,11 @@ public enum RedisDataType {
         public List<String> get(Jedis jedis, String key) {
             return Collections.singletonList(jedis.get(key));
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            jedis.del(key);
+        }
     },
     STRING {
         @Override
@@ -51,6 +56,11 @@ public enum RedisDataType {
         public List<String> get(Jedis jedis, String key) {
             return Collections.singletonList(jedis.get(key));
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            jedis.del(key);
+        }
     },
     HASH {
         @Override
@@ -65,6 +75,12 @@ public enum RedisDataType {
             Map<String, String> kvMap = jedis.hgetAll(key);
             return Collections.singletonList(JsonUtils.toJsonString(kvMap));
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            Map<String, String> fieldsMap = JsonUtils.toMap(value);
+            fieldsMap.forEach((k, v) -> jedis.hdel(key, k));
+        }
     },
     LIST {
         @Override
@@ -77,6 +93,11 @@ public enum RedisDataType {
         public List<String> get(Jedis jedis, String key) {
             return jedis.lrange(key, 0, -1);
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            jedis.lrem(key, 1, value);
+        }
     },
     SET {
         @Override
@@ -90,6 +111,11 @@ public enum RedisDataType {
             Set<String> members = jedis.smembers(key);
             return new ArrayList<>(members);
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            jedis.srem(key, value);
+        }
     },
     ZSET {
         @Override
@@ -102,6 +128,11 @@ public enum RedisDataType {
         public List<String> get(Jedis jedis, String key) {
             return jedis.zrange(key, 0, -1);
         }
+
+        @Override
+        public void del(Jedis jedis, String key, String value) {
+            jedis.zrem(key, value);
+        }
     };
 
     public List<String> get(Jedis jedis, String key) {
@@ -117,4 +148,8 @@ public enum RedisDataType {
     public void set(Jedis jedis, String key, String value, long expire) {
         // do nothing
     }
+
+    public void del(Jedis jedis, String key, String value) {
+        // do nothing
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 9d5c73df2c..b42fc6107b 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.sink;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
@@ -51,6 +52,7 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     private final int batchSize;
 
+    private final List<RowKind> rowKinds;
     private final List<String> keyBuffer;
     private final List<String> valueBuffer;
 
@@ -62,12 +64,14 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
         this.redisClient = redisParameters.buildRedisClient();
         this.batchSize = redisParameters.getBatchSize();
+        this.rowKinds = new ArrayList<>(batchSize);
         this.keyBuffer = new ArrayList<>(batchSize);
         this.valueBuffer = new ArrayList<>(batchSize);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        rowKinds.add(element.getRowKind());
         List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
         String key = getKey(element, fields);
         keyBuffer.add(key);
@@ -173,6 +177,7 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     }
 
     private void clearBuffer() {
+        rowKinds.clear();
         keyBuffer.clear();
         valueBuffer.clear();
     }
@@ -180,23 +185,28 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     private void doBatchWrite() {
         RedisDataType redisDataType = redisParameters.getRedisDataType();
         if (RedisDataType.KEY.equals(redisDataType) || 
RedisDataType.STRING.equals(redisDataType)) {
-            redisClient.batchWriteString(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            redisClient.batchWriteString(
+                    rowKinds, keyBuffer, valueBuffer, 
redisParameters.getExpire());
             return;
         }
         if (RedisDataType.LIST.equals(redisDataType)) {
-            redisClient.batchWriteList(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            redisClient.batchWriteList(
+                    rowKinds, keyBuffer, valueBuffer, 
redisParameters.getExpire());
             return;
         }
         if (RedisDataType.SET.equals(redisDataType)) {
-            redisClient.batchWriteSet(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            redisClient.batchWriteSet(
+                    rowKinds, keyBuffer, valueBuffer, 
redisParameters.getExpire());
             return;
         }
         if (RedisDataType.HASH.equals(redisDataType)) {
-            redisClient.batchWriteHash(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            redisClient.batchWriteHash(
+                    rowKinds, keyBuffer, valueBuffer, 
redisParameters.getExpire());
             return;
         }
         if (RedisDataType.ZSET.equals(redisDataType)) {
-            redisClient.batchWriteZset(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            redisClient.batchWriteZset(
+                    rowKinds, keyBuffer, valueBuffer, 
redisParameters.getExpire());
             return;
         }
         throw new RedisConnectorException(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 0f67575ea4..efd9d8df44 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -442,5 +442,64 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.del("custom-hash-check");
     }
 
+    @TestTemplate
+    public void testFakeToRedisDeleteHashTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/fake-to-redis-test-delete-hash.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.hlen("hash_check"));
+        jedis.del("hash_check");
+    }
+
+    @TestTemplate
+    public void testFakeToRedisDeleteKeyTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/fake-to-redis-test-delete-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int count = 0;
+        for (int i = 1; i <= 3; i++) {
+            String data = jedis.get("key_check:" + i);
+            if (data != null) {
+                count++;
+            }
+        }
+        Assertions.assertEquals(2, count);
+        for (int i = 1; i <= 3; i++) {
+            jedis.del("key_check:" + i);
+        }
+    }
+
+    @TestTemplate
+    public void testFakeToRedisDeleteListTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/fake-to-redis-test-delete-list.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.llen("list_check"));
+        jedis.del("list_check");
+    }
+
+    @TestTemplate
+    public void testFakeToRedisDeleteSetTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/fake-to-redis-test-delete-set.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.scard("set_check"));
+        jedis.del("set_check");
+    }
+
+    @TestTemplate
+    public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/fake-to-redis-test-delete-zset.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.zcard("zset_check"));
+        jedis.del("zset_check");
+    }
+
     public abstract RedisContainerInfo getRedisContainerInfo();
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
new file mode 100644
index 0000000000..cffd866916
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "hash_check"
+    data_type = hash
+    hash_key_field = "id"
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
new file mode 100644
index 0000000000..5be915889e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key_check:{id}"
+    data_type = key
+    support_custom_key = true
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
new file mode 100644
index 0000000000..55deb18754
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "list_check"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
new file mode 100644
index 0000000000..bd1c71128e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "set_check"
+    data_type = set
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
new file mode 100644
index 0000000000..cf80d3b00c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "zset_check"
+    data_type = zset
+    batch_size = 33
+  }
+}
\ No newline at end of file


Reply via email to