This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 93d38d5f46 [Improve][Core] Add protobuf transform test case (#7914)
93d38d5f46 is described below
commit 93d38d5f46aa4da71a203a9a9b2ae5d4e6cdec90
Author: Jast <[email protected]>
AuthorDate: Sat Oct 26 02:46:27 2024 +0800
[Improve][Core] Add protobuf transform test case (#7914)
---
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 108 +++++++++++++++-----
.../kafka_protobuf_transform_to_assert.conf | 109 +++++++++++++++++++++
2 files changed, 195 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 4a57cbdbd3..986e5f9f2e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
@@ -64,6 +65,7 @@ import
org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -693,30 +695,11 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
ProtobufDeserializationSchema deserializationSchema =
new ProtobufDeserializationSchema(catalogTable);
- // Create serializer
DefaultSeaTunnelRowSerializer serializer =
- DefaultSeaTunnelRowSerializer.create(
- "test_protobuf_topic_fake_source",
- seaTunnelRowType,
- MessageFormat.PROTOBUF,
- DEFAULT_FIELD_DELIMITER,
- readonlyConfig);
-
- // Produce records to Kafka
- IntStream.range(0, 20)
- .forEach(
- i -> {
- try {
- SeaTunnelRow originalRow = buildSeaTunnelRow();
- ProducerRecord<byte[], byte[]> producerRecord =
- serializer.serializeRow(originalRow);
- producer.send(producerRecord).get();
- } catch (InterruptedException | ExecutionException
e) {
- throw new RuntimeException("Error sending
Kafka message", e);
- }
- });
+ getDefaultSeaTunnelRowSerializer(
+ "test_protobuf_topic_fake_source", seaTunnelRowType,
readonlyConfig);
- producer.flush();
+ sendData(serializer);
// Execute the job and validate
Container.ExecResult execResult = container.executeJob(confFile);
@@ -769,6 +752,87 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
});
}
+ private @NotNull DefaultSeaTunnelRowSerializer
getDefaultSeaTunnelRowSerializer(
+ String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
readonlyConfig) {
+ // Create serializer
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ topic,
+ seaTunnelRowType,
+ MessageFormat.PROTOBUF,
+ DEFAULT_FIELD_DELIMITER,
+ readonlyConfig);
+ return serializer;
+ }
+
+ private void sendData(DefaultSeaTunnelRowSerializer serializer) {
+ // Produce records to Kafka
+ IntStream.range(0, 20)
+ .forEach(
+ i -> {
+ try {
+ SeaTunnelRow originalRow = buildSeaTunnelRow();
+ ProducerRecord<byte[], byte[]> producerRecord =
+ serializer.serializeRow(originalRow);
+ producer.send(producerRecord).get();
+ } catch (InterruptedException | ExecutionException
e) {
+ throw new RuntimeException("Error sending
Kafka message", e);
+ }
+ });
+
+ producer.flush();
+ }
+
+ @TestTemplate
+ public void testKafkaProtobufForTransformToAssert(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+
+ String confFile = "/protobuf/kafka_protobuf_transform_to_assert.conf";
+ String path = getTestConfigFile(confFile);
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sinkConfig = config.getConfigList("source").get(0);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+ SeaTunnelRowType seaTunnelRowType = buildSeaTunnelRowType();
+
+ // Create serializer
+ DefaultSeaTunnelRowSerializer serializer =
+ getDefaultSeaTunnelRowSerializer(
+ "test_protobuf_topic_transform_fake_source",
+ seaTunnelRowType,
+ readonlyConfig);
+
+ // Produce records to Kafka
+ sendData(serializer);
+
+ // Execute the job and validate
+ Container.ExecResult execResult = container.executeJob(confFile);
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ try (KafkaConsumer<byte[], byte[]> consumer =
+ new KafkaConsumer<>(kafkaByteConsumerConfig())) {
+ consumer.subscribe(Arrays.asList("verify_protobuf_transform"));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(
+ Arrays.asList(new
TopicPartition("verify_protobuf_transform", 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ String data = new String(record.value(), "UTF-8");
+ ObjectNode jsonNodes = JsonUtils.parseObject(data);
+ Assertions.assertEquals(jsonNodes.size(), 2);
+
Assertions.assertEquals(jsonNodes.get("city").asText(), "city_value");
+
Assertions.assertEquals(jsonNodes.get("c_string").asText(), "test data");
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ }
+
public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = KafkaIT.class.getResource(configFile);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
new file mode 100644
index 0000000000..1a48db8c5f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ topic = "test_protobuf_topic_transform_fake_source"
+ format = protobuf
+ protobuf_message_name = Person
+ protobuf_schema = """
+ syntax = "proto3";
+
+ package org.apache.seatunnel.format.protobuf;
+
+ option java_outer_classname = "ProtobufE2E";
+
+ message Person {
+ int32 c_int32 = 1;
+ int64 c_int64 = 2;
+ float c_float = 3;
+ double c_double = 4;
+ bool c_bool = 5;
+ string c_string = 6;
+ bytes c_bytes = 7;
+
+ message Address {
+ string street = 1;
+ string city = 2;
+ string state = 3;
+ string zip = 4;
+ }
+
+ Address address = 8;
+
+ map<string, float> attributes = 9;
+
+ repeated string phone_numbers = 10;
+ }
+ """
+ schema = {
+ fields {
+ c_int32 = int
+ c_int64 = long
+ c_float = float
+ c_double = double
+ c_bool = boolean
+ c_string = string
+ c_bytes = bytes
+
+ Address {
+ city = string
+ state = string
+ street = string
+ }
+ attributes = "map<string,float>"
+ phone_numbers = "array<string>"
+ }
+ }
+ bootstrap.servers = "kafkaCluster:9092"
+ start_mode = "earliest"
+ result_table_name = "kafka_table"
+ }
+}
+
+transform {
+ Sql {
+ source_table_name = "kafka_table"
+ result_table_name = "kafka_table_transform"
+ query = "select Address.city,c_string from kafka_table"
+ }
+}
+
+sink {
+ kafka {
+ topic = "verify_protobuf_transform"
+ source_table_name = "kafka_table_transform"
+ bootstrap.servers = "kafkaCluster:9092"
+ kafka.request.timeout.ms = 60000
+ kafka.config = {
+ acks = "all"
+ request.timeout.ms = 60000
+ buffer.memory = 33554432
+ }
+
+ }
+}