This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 02a35c3979 [Feature] [Connector-Redis] Redis connector support delete
data (#7994)
02a35c3979 is described below
commit 02a35c3979d905fe612c97f003876122b410a671
Author: limin <[email protected]>
AuthorDate: Wed Nov 13 18:45:50 2024 +0800
[Feature] [Connector-Redis] Redis connector support delete data (#7994)
Co-authored-by: limin <[email protected]>
---
.../seatunnel/redis/client/RedisClient.java | 23 ++++--
.../seatunnel/redis/client/RedisClusterClient.java | 46 +++++++++---
.../seatunnel/redis/client/RedisSingleClient.java | 73 +++++++++++++-----
.../seatunnel/redis/config/RedisDataType.java | 35 +++++++++
.../seatunnel/redis/sink/RedisSinkWriter.java | 20 +++--
.../connector/redis/RedisTestCaseTemplateIT.java | 59 +++++++++++++++
.../resources/fake-to-redis-test-delete-hash.conf | 87 ++++++++++++++++++++++
.../resources/fake-to-redis-test-delete-key.conf | 87 ++++++++++++++++++++++
.../resources/fake-to-redis-test-delete-list.conf | 86 +++++++++++++++++++++
.../resources/fake-to-redis-test-delete-set.conf | 86 +++++++++++++++++++++
.../resources/fake-to-redis-test-delete-zset.conf | 86 +++++++++++++++++++++
11 files changed, 648 insertions(+), 40 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index 109c51f78a..af7894795d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.client;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
@@ -92,17 +93,29 @@ public abstract class RedisClient extends Jedis {
public abstract List<List<String>> batchGetZset(List<String> keys);
public abstract void batchWriteString(
- List<String> keys, List<String> values, long expireSeconds);
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds);
public abstract void batchWriteList(
- List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+ List<RowKind> rowKinds,
+ List<String> keyBuffer,
+ List<String> valueBuffer,
+ long expireSeconds);
public abstract void batchWriteSet(
- List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+ List<RowKind> rowKinds,
+ List<String> keyBuffer,
+ List<String> valueBuffer,
+ long expireSeconds);
public abstract void batchWriteHash(
- List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+ List<RowKind> rowKinds,
+ List<String> keyBuffer,
+ List<String> valueBuffer,
+ long expireSeconds);
public abstract void batchWriteZset(
- List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+ List<RowKind> rowKinds,
+ List<String> keyBuffer,
+ List<String> valueBuffer,
+ long expireSeconds);
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index bd687e6c9b..485499476c 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.client;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
@@ -97,42 +98,67 @@ public class RedisClusterClient extends RedisClient {
}
@Override
- public void batchWriteString(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteString(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
- RedisDataType.STRING.set(this, keys.get(i), values.get(i),
expireSeconds);
+ if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
+ RedisDataType.STRING.del(this, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.STRING.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
}
}
@Override
- public void batchWriteList(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteList(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
- RedisDataType.LIST.set(this, keys.get(i), values.get(i),
expireSeconds);
+ if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
+ RedisDataType.LIST.del(this, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.LIST.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
}
}
@Override
- public void batchWriteSet(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteSet(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
- RedisDataType.SET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
+ RedisDataType.SET.del(this, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.SET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
}
}
@Override
- public void batchWriteHash(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteHash(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
- RedisDataType.HASH.set(this, keys.get(i), values.get(i),
expireSeconds);
+ if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
+ RedisDataType.HASH.del(this, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.HASH.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
}
}
@Override
- public void batchWriteZset(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteZset(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
- RedisDataType.ZSET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
+ RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.ZSET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index f79aa46e98..c9d3ba6788 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.client;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
@@ -139,76 +140,108 @@ public class RedisSingleClient extends RedisClient {
}
@Override
- public void batchWriteString(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteString(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
+ RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
- pipelined.set(key, value);
- if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
+ pipelined.del(key);
+ } else {
+ pipelined.set(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
}
}
pipelined.sync();
}
@Override
- public void batchWriteList(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteList(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
+ RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
- pipelined.lpush(key, value);
- if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
+ pipelined.lrem(key, 1, value);
+ } else {
+ pipelined.lpush(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
}
}
pipelined.sync();
}
@Override
- public void batchWriteSet(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteSet(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
+ RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
- pipelined.sadd(key, value);
- if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
+ pipelined.srem(key, value);
+ } else {
+ pipelined.sadd(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
}
}
pipelined.sync();
}
@Override
- public void batchWriteHash(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteHash(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
+ RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
Map<String, String> fieldsMap = JsonUtils.toMap(value);
- pipelined.hset(key, fieldsMap);
- if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
+ for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
+ pipelined.hdel(key, entry.getKey());
+ }
+ } else {
+ pipelined.hset(key, fieldsMap);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
}
}
pipelined.sync();
}
@Override
- public void batchWriteZset(List<String> keys, List<String> values, long
expireSeconds) {
+ public void batchWriteZset(
+ List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
+ RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
- pipelined.zadd(key, 1, value);
- if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
+ pipelined.zrem(key, value);
+ } else {
+ pipelined.zadd(key, 1, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
}
}
pipelined.sync();
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index aac874254d..7929e8181d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -39,6 +39,11 @@ public enum RedisDataType {
public List<String> get(Jedis jedis, String key) {
return Collections.singletonList(jedis.get(key));
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ jedis.del(key);
+ }
},
STRING {
@Override
@@ -51,6 +56,11 @@ public enum RedisDataType {
public List<String> get(Jedis jedis, String key) {
return Collections.singletonList(jedis.get(key));
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ jedis.del(key);
+ }
},
HASH {
@Override
@@ -65,6 +75,12 @@ public enum RedisDataType {
Map<String, String> kvMap = jedis.hgetAll(key);
return Collections.singletonList(JsonUtils.toJsonString(kvMap));
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ Map<String, String> fieldsMap = JsonUtils.toMap(value);
+ fieldsMap.forEach((k, v) -> jedis.hdel(key, k));
+ }
},
LIST {
@Override
@@ -77,6 +93,11 @@ public enum RedisDataType {
public List<String> get(Jedis jedis, String key) {
return jedis.lrange(key, 0, -1);
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ jedis.lrem(key, 1, value);
+ }
},
SET {
@Override
@@ -90,6 +111,11 @@ public enum RedisDataType {
Set<String> members = jedis.smembers(key);
return new ArrayList<>(members);
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ jedis.srem(key, value);
+ }
},
ZSET {
@Override
@@ -102,6 +128,11 @@ public enum RedisDataType {
public List<String> get(Jedis jedis, String key) {
return jedis.zrange(key, 0, -1);
}
+
+ @Override
+ public void del(Jedis jedis, String key, String value) {
+ jedis.zrem(key, value);
+ }
};
public List<String> get(Jedis jedis, String key) {
@@ -117,4 +148,8 @@ public enum RedisDataType {
public void set(Jedis jedis, String key, String value, long expire) {
// do nothing
}
+
+ public void del(Jedis jedis, String key, String value) {
+ // do nothing
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 9d5c73df2c..b42fc6107b 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
@@ -51,6 +52,7 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final int batchSize;
+ private final List<RowKind> rowKinds;
private final List<String> keyBuffer;
private final List<String> valueBuffer;
@@ -62,12 +64,14 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
this.redisClient = redisParameters.buildRedisClient();
this.batchSize = redisParameters.getBatchSize();
+ this.rowKinds = new ArrayList<>(batchSize);
this.keyBuffer = new ArrayList<>(batchSize);
this.valueBuffer = new ArrayList<>(batchSize);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ rowKinds.add(element.getRowKind());
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key = getKey(element, fields);
keyBuffer.add(key);
@@ -173,6 +177,7 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
private void clearBuffer() {
+ rowKinds.clear();
keyBuffer.clear();
valueBuffer.clear();
}
@@ -180,23 +185,28 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private void doBatchWrite() {
RedisDataType redisDataType = redisParameters.getRedisDataType();
if (RedisDataType.KEY.equals(redisDataType) ||
RedisDataType.STRING.equals(redisDataType)) {
- redisClient.batchWriteString(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ redisClient.batchWriteString(
+ rowKinds, keyBuffer, valueBuffer,
redisParameters.getExpire());
return;
}
if (RedisDataType.LIST.equals(redisDataType)) {
- redisClient.batchWriteList(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ redisClient.batchWriteList(
+ rowKinds, keyBuffer, valueBuffer,
redisParameters.getExpire());
return;
}
if (RedisDataType.SET.equals(redisDataType)) {
- redisClient.batchWriteSet(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ redisClient.batchWriteSet(
+ rowKinds, keyBuffer, valueBuffer,
redisParameters.getExpire());
return;
}
if (RedisDataType.HASH.equals(redisDataType)) {
- redisClient.batchWriteHash(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ redisClient.batchWriteHash(
+ rowKinds, keyBuffer, valueBuffer,
redisParameters.getExpire());
return;
}
if (RedisDataType.ZSET.equals(redisDataType)) {
- redisClient.batchWriteZset(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ redisClient.batchWriteZset(
+ rowKinds, keyBuffer, valueBuffer,
redisParameters.getExpire());
return;
}
throw new RedisConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 0f67575ea4..efd9d8df44 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -442,5 +442,64 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("custom-hash-check");
}
+ @TestTemplate
+ public void testFakeToRedisDeleteHashTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-redis-test-delete-hash.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.hlen("hash_check"));
+ jedis.del("hash_check");
+ }
+
+ @TestTemplate
+ public void testFakeToRedisDeleteKeyTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-redis-test-delete-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int count = 0;
+ for (int i = 1; i <= 3; i++) {
+ String data = jedis.get("key_check:" + i);
+ if (data != null) {
+ count++;
+ }
+ }
+ Assertions.assertEquals(2, count);
+ for (int i = 1; i <= 3; i++) {
+ jedis.del("key_check:" + i);
+ }
+ }
+
+ @TestTemplate
+ public void testFakeToRedisDeleteListTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-redis-test-delete-list.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.llen("list_check"));
+ jedis.del("list_check");
+ }
+
+ @TestTemplate
+ public void testFakeToRedisDeleteSetTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-redis-test-delete-set.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.scard("set_check"));
+ jedis.del("set_check");
+ }
+
+ @TestTemplate
+ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-redis-test-delete-zset.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.zcard("zset_check"));
+ jedis.del("zset_check");
+ }
+
public abstract RedisContainerInfo getRedisContainerInfo();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
new file mode 100644
index 0000000000..cffd866916
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "hash_check"
+ data_type = hash
+ hash_key_field = "id"
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
new file mode 100644
index 0000000000..5be915889e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key_check:{id}"
+ data_type = key
+ support_custom_key = true
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
new file mode 100644
index 0000000000..55deb18754
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "list_check"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
new file mode 100644
index 0000000000..bd1c71128e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "set_check"
+ data_type = set
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
new file mode 100644
index 0000000000..cf80d3b00c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "zset_check"
+ data_type = zset
+ batch_size = 33
+ }
+}
\ No newline at end of file