This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new db6e2994d4 [improve] update kafka source default schema from 
content<ROW<content STRING>> to content<STRING> (#8642)
db6e2994d4 is described below

commit db6e2994d4efcc8cd3865a0e5dd751645b411800
Author: Jarvis <[email protected]>
AuthorDate: Mon Feb 17 20:29:58 2025 +0800

    [improve] update kafka source default schema from content<ROW<content 
STRING>> to content<STRING> (#8642)
---
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 12 +----
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 20 +++++++
 .../kafka_source_text_with_no_schema.conf          | 63 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 981dbc0847..53dc27a673 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -218,16 +217,7 @@ public class KafkaSourceConfig implements Serializable {
                     TableSchema.builder()
                             .column(
                                     PhysicalColumn.of(
-                                            "content",
-                                            new SeaTunnelRowType(
-                                                    new String[] {"content"},
-                                                    new SeaTunnelDataType<?>[] 
{
-                                                        BasicType.STRING_TYPE
-                                                    }),
-                                            0,
-                                            false,
-                                            null,
-                                            null))
+                                            "content", BasicType.STRING_TYPE, 
0, false, null, null))
                             .build();
         }
         return CatalogTable.of(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 595fe3042e..ebe1ecb45e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -231,6 +231,26 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testTextFormatWithNoSchema(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            for (int i = 0; i < 100; i++) {
+                ProducerRecord<byte[], byte[]> producerRecord =
+                        new ProducerRecord<>(
+                                "test_topic_text_no_schema", null, 
"abcdef".getBytes());
+                producer.send(producerRecord).get();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            producer.flush();
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/textFormatIT/kafka_source_text_with_no_schema.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer 
container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
new file mode 100644
index 0000000000..bcde96c84e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_text_no_schema"
+    plugin_output = "kafka_table"
+    start_mode = "earliest"
+    format_error_handle_way = fail
+    format = text
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "kafka_table"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = "content"
+            field_type = "string"
+            field_value = [
+              {equals_to = "abcdef"}
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file

Reply via email to