This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e15757bcd7 [Feature][Redis] Flush data when the time reaches
checkpoint.interval and update test case (#8308)
e15757bcd7 is described below
commit e15757bcd7031a668b28d5f04d83b570366e23f6
Author: limin <[email protected]>
AuthorDate: Tue Dec 17 18:40:41 2024 +0800
[Feature][Redis] Flush data when the time reaches checkpoint.interval and
update test case (#8308)
---
.../seatunnel/redis/sink/RedisSinkWriter.java | 14 +++-
.../connector/redis/RedisTestCaseTemplateIT.java | 80 +++++++++++++++++++++-
.../resources/fake-to-redis-test-in-real-time.conf | 68 ++++++++++++++++++
3 files changed, 159 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 71739b5789..b634462fbf 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
@@ -78,8 +79,7 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
String value = getValue(element, fields);
valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
- doBatchWrite();
- clearBuffer();
+ flush();
}
}
@@ -221,6 +221,16 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void close() throws IOException {
+ flush();
+ }
+
+ @Override
+ public Optional<Void> prepareCommit() {
+ flush();
+ return Optional.empty();
+ }
+
+ private synchronized void flush() {
if (!keyBuffer.isEmpty()) {
doBatchWrite();
clearBuffer();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 96ac20cbe6..bdc66016db 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.e2e.connector.redis;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -25,15 +27,20 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,13 +59,21 @@ import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.awaitility.Awaitility.await;
+
@Slf4j
public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements
TestResource {
@@ -492,7 +507,7 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
}
@TestTemplate
- public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
+ public void testFakeToToRedisDeleteZSetTest(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake-to-redis-test-delete-zset.conf");
@@ -501,6 +516,69 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("zset_check");
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Only support for seatunnel")
+ @DisabledOnOs(OS.WINDOWS)
+ public void testFakeToRedisInRealTimeTest(TestContainer container)
+ throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/fake-to-redis-test-in-real-time.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(3,
jedis.llen("list_check"));
+ });
+ jedis.del("list_check");
+ // Get the task id
+ Container.ExecResult execResult = container.executeBaseCommand(new
String[] {"-l"});
+ String regex = "(\\d+)\\s+";
+ Pattern pattern = Pattern.compile(regex);
+ List<String> runningJobId =
+ Arrays.stream(execResult.getStdout().toString().split("\n"))
+ .filter(s ->
s.contains("fake-to-redis-test-in-real-time"))
+ .map(
+ s -> {
+ Matcher matcher = pattern.matcher(s);
+ return matcher.find() ? matcher.group(1) :
null;
+ })
+ .filter(jobId -> jobId != null)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(1, runningJobId.size());
+ // Verify that the status is Running
+ for (String jobId : runningJobId) {
+ Container.ExecResult execResult1 =
+ container.executeBaseCommand(new String[] {"-j", jobId});
+ String stdout = execResult1.getStdout();
+ ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
+ Assertions.assertEquals(jsonNodes.get("jobStatus").asText(),
"RUNNING");
+ }
+ // Execute cancellation task
+ String[] batchCancelCommand =
+ Stream.concat(Arrays.stream(new String[] {"-can"}),
runningJobId.stream())
+ .toArray(String[]::new);
+ Assertions.assertEquals(0,
container.executeBaseCommand(batchCancelCommand).getExitCode());
+
+ // Verify whether the cancellation is successful
+ for (String jobId : runningJobId) {
+ Container.ExecResult execResult1 =
+ container.executeBaseCommand(new String[] {"-j", jobId});
+ String stdout = execResult1.getStdout();
+ ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
+ Assertions.assertEquals(jsonNodes.get("jobStatus").asText(),
"CANCELED");
+ }
+ }
+
@TestTemplate
public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
new file mode 100644
index 0000000000..9e8829a391
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 10000
+ shade.identifier = "base64"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "list_check"
+ data_type = list
+ batch_size = 33
+ }
+}