This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e15757bcd7 [Feature][Redis] Flush data when the time reaches 
checkpoint.interval and update test case (#8308)
e15757bcd7 is described below

commit e15757bcd7031a668b28d5f04d83b570366e23f6
Author: limin <[email protected]>
AuthorDate: Tue Dec 17 18:40:41 2024 +0800

    [Feature][Redis] Flush data when the time reaches checkpoint.interval and 
update test case (#8308)
---
 .../seatunnel/redis/sink/RedisSinkWriter.java      | 14 +++-
 .../connector/redis/RedisTestCaseTemplateIT.java   | 80 +++++++++++++++++++++-
 .../resources/fake-to-redis-test-in-real-time.conf | 68 ++++++++++++++++++
 3 files changed, 159 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 71739b5789..b634462fbf 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
@@ -78,8 +79,7 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         String value = getValue(element, fields);
         valueBuffer.add(value);
         if (keyBuffer.size() >= batchSize) {
-            doBatchWrite();
-            clearBuffer();
+            flush();
         }
     }
 
@@ -221,6 +221,16 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     @Override
     public void close() throws IOException {
+        flush();
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() {
+        flush();
+        return Optional.empty();
+    }
+
+    private synchronized void flush() {
         if (!keyBuffer.isEmpty()) {
             doBatchWrite();
             clearBuffer();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 96ac20cbe6..bdc66016db 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.e2e.connector.redis;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -25,15 +27,20 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,13 +59,21 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.awaitility.Awaitility.await;
+
 @Slf4j
 public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements 
TestResource {
 
@@ -492,7 +507,7 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
     }
 
     @TestTemplate
-    public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
+    public void testFakeToToRedisDeleteZSetTest(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
                 container.executeJob("/fake-to-redis-test-delete-zset.conf");
@@ -501,6 +516,69 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.del("zset_check");
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Only support for seatunnel")
+    @DisabledOnOs(OS.WINDOWS)
+    public void testFakeToRedisInRealTimeTest(TestContainer container)
+            throws IOException, InterruptedException {
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/fake-to-redis-test-in-real-time.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(3, 
jedis.llen("list_check"));
+                        });
+        jedis.del("list_check");
+        // Get the task id
+        Container.ExecResult execResult = container.executeBaseCommand(new 
String[] {"-l"});
+        String regex = "(\\d+)\\s+";
+        Pattern pattern = Pattern.compile(regex);
+        List<String> runningJobId =
+                Arrays.stream(execResult.getStdout().toString().split("\n"))
+                        .filter(s -> 
s.contains("fake-to-redis-test-in-real-time"))
+                        .map(
+                                s -> {
+                                    Matcher matcher = pattern.matcher(s);
+                                    return matcher.find() ? matcher.group(1) : 
null;
+                                })
+                        .filter(jobId -> jobId != null)
+                        .collect(Collectors.toList());
+        Assertions.assertEquals(1, runningJobId.size());
+        // Verify that the status is Running
+        for (String jobId : runningJobId) {
+            Container.ExecResult execResult1 =
+                    container.executeBaseCommand(new String[] {"-j", jobId});
+            String stdout = execResult1.getStdout();
+            ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
+            Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), 
"RUNNING");
+        }
+        // Execute cancellation task
+        String[] batchCancelCommand =
+                Stream.concat(Arrays.stream(new String[] {"-can"}), 
runningJobId.stream())
+                        .toArray(String[]::new);
+        Assertions.assertEquals(0, 
container.executeBaseCommand(batchCancelCommand).getExitCode());
+
+        // Verify whether the cancellation is successful
+        for (String jobId : runningJobId) {
+            Container.ExecResult execResult1 =
+                    container.executeBaseCommand(new String[] {"-j", jobId});
+            String stdout = execResult1.getStdout();
+            ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
+            Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), 
"CANCELED");
+        }
+    }
+
     @TestTemplate
     public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
new file mode 100644
index 0000000000..9e8829a391
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 10000
+  shade.identifier = "base64"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "list_check"
+    data_type = list
+    batch_size = 33
+  }
+}

Reply via email to