This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c92e14168f [Fix][E2E] Fixed e2e test cases of Kafka did not work as 
expected (#9359)
c92e14168f is described below

commit c92e14168ff2650a98d6ddb200ad355fa4462098
Author: WenDing-Y <[email protected]>
AuthorDate: Wed May 28 10:42:54 2025 +0800

    [Fix][E2E] Fixed e2e test cases of Kafka did not work as expected (#9359)
---
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 70 ++++++++++++++++++----
 .../kafka/kafkasource_earliest_to_console.conf     | 12 +++-
 .../kafka/kafkasource_endTimestamp_to_console.conf | 12 +++-
 ...ce_format_error_handle_way_skip_to_console.conf |  7 +++
 .../kafka/kafkasource_group_offset_to_console.conf | 13 +++-
 .../kafkasource_specific_offsets_to_console.conf   | 12 +++-
 .../kafka/kafkasource_timestamp_to_console.conf    | 12 +++-
 7 files changed, 123 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 4a4352e3fb..728c5ee8e0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -51,7 +51,9 @@ import 
org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -143,6 +145,27 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(this::initKafkaProducer);
 
+        Properties adminProps = new Properties();
+        adminProps.put(
+                AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        // Set the retention time to -1 to read data older than 7 days.
+        try (AdminClient adminClient = AdminClient.create(adminProps)) {
+            NewTopic testTopicSource = new NewTopic("test_topic_source", 1, 
(short) 1);
+            testTopicSource.configs(Collections.singletonMap("retention.ms", 
"-1"));
+
+            NewTopic testTopicNativeSource = new 
NewTopic("test_topic_native_source", 1, (short) 1);
+            
testTopicNativeSource.configs(Collections.singletonMap("retention.ms", "-1"));
+
+            NewTopic testTopicSourceWithTimestamp =
+                    new NewTopic("test_topic_source_timestamp", 1, (short) 1);
+            
testTopicSourceWithTimestamp.configs(Collections.singletonMap("retention.ms", 
"-1"));
+
+            List<NewTopic> topics =
+                    Arrays.asList(
+                            testTopicSource, testTopicNativeSource, 
testTopicSourceWithTimestamp);
+            adminClient.createTopics(topics);
+        }
+
         log.info("Write 100 records to topic test_topic_source");
         DefaultSeaTunnelRowSerializer serializer =
                 DefaultSeaTunnelRowSerializer.create(
@@ -152,6 +175,18 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FIELD_DELIMITER,
                         null);
         generateTestData(serializer::serializeRow, 0, 100);
+
+        DefaultSeaTunnelRowSerializer rowSerializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_source_timestamp",
+                        DEFAULT_FORMAT,
+                        new SeaTunnelRowType(
+                                new String[] {"id", "timestamp"},
+                                new SeaTunnelDataType[] {BasicType.LONG_TYPE, 
BasicType.LONG_TYPE}),
+                        "",
+                        null);
+        generateWithTimestampTestData(rowSerializer::serializeRow, 0, 100, 
1738395840000L);
+
         String topicName = "test_topic_native_source";
         generateNativeTestData("test_topic_native_source", 0, 100);
         nativeData = getKafkaRecordData(topicName);
@@ -412,16 +447,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     public void testSourceKafkaWithEndTimestamp(TestContainer container)
             throws IOException, InterruptedException {
-        DefaultSeaTunnelRowSerializer serializer =
-                DefaultSeaTunnelRowSerializer.create(
-                        "test_topic_source",
-                        DEFAULT_FORMAT,
-                        new SeaTunnelRowType(
-                                new String[] {"id", "timestamp"},
-                                new SeaTunnelDataType[] {BasicType.LONG_TYPE, 
BasicType.LONG_TYPE}),
-                        "",
-                        null);
-        generateWithTimestampTestData(serializer::serializeRow, 0, 100, 
1738395840000L);
+
         testKafkaWithEndTimestampToConsole(container);
     }
 
@@ -435,10 +461,34 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FORMAT,
                         DEFAULT_FIELD_DELIMITER,
                         null);
+        generateTestData(row -> serializer.serializeRow(row), 0, 10);
+        commitOffset("test_topic_group", "SeaTunnel-Consumer-Group-Offset");
         generateTestData(row -> serializer.serializeRow(row), 100, 150);
         testKafkaGroupOffsetsToConsole(container);
     }
 
+    public void commitOffset(String topic, String groupId) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Collections.singletonList(topic));
+        try {
+            consumer.poll(Duration.ofSeconds(60));
+            consumer.commitSync();
+        } finally {
+            consumer.close();
+        }
+    }
+
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index fb3fae77fa..bd9fb2b3e4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -69,6 +69,16 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 100
+                },
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 100
+                }
+              ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
index 8f2edca236..91646ea353 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_endTimestamp_to_console.conf
@@ -26,7 +26,7 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_source"
+    topic = "test_topic_source_timestamp"
     plugin_output = "kafka_table"
     # The default format is json, which is optional
     format = json
@@ -69,6 +69,16 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                        {
+                          rule_type = MIN_ROW
+                          rule_value = 60
+                        },
+                        {
+                          rule_type = MAX_ROW
+                          rule_value = 60
+                        }
+                      ]
       }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 510dc27242..fc55de5733 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -90,6 +90,13 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 0
+                }
+              ]
+
       }
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 7934900039..653838c0f8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -29,6 +29,7 @@ source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_topic_group"
+    consumer.group = "SeaTunnel-Consumer-Group-Offset"
     plugin_output = "kafka_table"
     # The default format is json, which is optional
     format = json
@@ -69,6 +70,16 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 50
+                },
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 50
+                }
+              ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 2e18fe6451..afbcd71436 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -71,6 +71,16 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                        {
+                          rule_type = MIN_ROW
+                          rule_value = 50
+                        },
+                        {
+                          rule_type = MAX_ROW
+                          rule_value = 50
+                        }
+                      ]
       }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 6356b13f0a..126a7e2a89 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -68,6 +68,16 @@ sink {
             ]
           }
         ]
+        row_rules = [
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 100
+                },
+                 {
+                  rule_type = MAX_ROW
+                  rule_value = 100
+                 }
+              ]
       }
   }
-}
\ No newline at end of file
+}

Reply via email to