This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new db6e2994d4 [improve] update kafka source default schema from
content<ROW<content STRING>> to content<STRING> (#8642)
db6e2994d4 is described below
commit db6e2994d4efcc8cd3865a0e5dd751645b411800
Author: Jarvis <[email protected]>
AuthorDate: Mon Feb 17 20:29:58 2025 +0800
[improve] update kafka source default schema from content<ROW<content
STRING>> to content<STRING> (#8642)
---
.../seatunnel/kafka/source/KafkaSourceConfig.java | 12 +----
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 20 +++++++
.../kafka_source_text_with_no_schema.conf | 63 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 981dbc0847..53dc27a673 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -218,16 +217,7 @@ public class KafkaSourceConfig implements Serializable {
TableSchema.builder()
.column(
PhysicalColumn.of(
- "content",
- new SeaTunnelRowType(
- new String[] {"content"},
- new SeaTunnelDataType<?>[]
{
- BasicType.STRING_TYPE
- }),
- 0,
- false,
- null,
- null))
+ "content", BasicType.STRING_TYPE,
0, false, null, null))
.build();
}
return CatalogTable.of(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 595fe3042e..ebe1ecb45e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -231,6 +231,26 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testTextFormatWithNoSchema(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ for (int i = 0; i < 100; i++) {
+ ProducerRecord<byte[], byte[]> producerRecord =
+ new ProducerRecord<>(
+ "test_topic_text_no_schema", null,
"abcdef".getBytes());
+ producer.send(producerRecord).get();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ producer.flush();
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/textFormatIT/kafka_source_text_with_no_schema.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer
container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
new file mode 100644
index 0000000000..bcde96c84e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_no_schema.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_text_no_schema"
+ plugin_output = "kafka_table"
+ start_mode = "earliest"
+ format_error_handle_way = fail
+ format = text
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "kafka_table"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = "content"
+ field_type = "string"
+ field_value = [
+ {equals_to = "abcdef"}
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file