This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 93d38d5f46 [Improve][Core] Add protobuf transform test case (#7914)
93d38d5f46 is described below

commit 93d38d5f46aa4da71a203a9a9b2ae5d4e6cdec90
Author: Jast <[email protected]>
AuthorDate: Sat Oct 26 02:46:27 2024 +0800

    [Improve][Core] Add protobuf transform test case (#7914)
---
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 108 +++++++++++++++-----
 .../kafka_protobuf_transform_to_assert.conf        | 109 +++++++++++++++++++++
 2 files changed, 195 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 4a57cbdbd3..986e5f9f2e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import org.apache.seatunnel.e2e.common.TestResource;
@@ -64,6 +65,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -693,30 +695,11 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         ProtobufDeserializationSchema deserializationSchema =
                 new ProtobufDeserializationSchema(catalogTable);
 
-        // Create serializer
         DefaultSeaTunnelRowSerializer serializer =
-                DefaultSeaTunnelRowSerializer.create(
-                        "test_protobuf_topic_fake_source",
-                        seaTunnelRowType,
-                        MessageFormat.PROTOBUF,
-                        DEFAULT_FIELD_DELIMITER,
-                        readonlyConfig);
-
-        // Produce records to Kafka
-        IntStream.range(0, 20)
-                .forEach(
-                        i -> {
-                            try {
-                                SeaTunnelRow originalRow = buildSeaTunnelRow();
-                                ProducerRecord<byte[], byte[]> producerRecord =
-                                        serializer.serializeRow(originalRow);
-                                producer.send(producerRecord).get();
-                            } catch (InterruptedException | ExecutionException 
e) {
-                                throw new RuntimeException("Error sending 
Kafka message", e);
-                            }
-                        });
+                getDefaultSeaTunnelRowSerializer(
+                        "test_protobuf_topic_fake_source", seaTunnelRowType, 
readonlyConfig);
 
-        producer.flush();
+        sendData(serializer);
 
         // Execute the job and validate
         Container.ExecResult execResult = container.executeJob(confFile);
@@ -769,6 +752,87 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 });
     }
 
+    private @NotNull DefaultSeaTunnelRowSerializer 
getDefaultSeaTunnelRowSerializer(
+            String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
readonlyConfig) {
+        // Create serializer
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        topic,
+                        seaTunnelRowType,
+                        MessageFormat.PROTOBUF,
+                        DEFAULT_FIELD_DELIMITER,
+                        readonlyConfig);
+        return serializer;
+    }
+
+    private void sendData(DefaultSeaTunnelRowSerializer serializer) {
+        // Produce records to Kafka
+        IntStream.range(0, 20)
+                .forEach(
+                        i -> {
+                            try {
+                                SeaTunnelRow originalRow = buildSeaTunnelRow();
+                                ProducerRecord<byte[], byte[]> producerRecord =
+                                        serializer.serializeRow(originalRow);
+                                producer.send(producerRecord).get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                throw new RuntimeException("Error sending 
Kafka message", e);
+                            }
+                        });
+
+        producer.flush();
+    }
+
+    @TestTemplate
+    public void testKafkaProtobufForTransformToAssert(TestContainer container)
+            throws IOException, InterruptedException, URISyntaxException {
+
+        String confFile = "/protobuf/kafka_protobuf_transform_to_assert.conf";
+        String path = getTestConfigFile(confFile);
+        Config config = ConfigFactory.parseFile(new File(path));
+        Config sinkConfig = config.getConfigList("source").get(0);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+        SeaTunnelRowType seaTunnelRowType = buildSeaTunnelRowType();
+
+        // Create serializer
+        DefaultSeaTunnelRowSerializer serializer =
+                getDefaultSeaTunnelRowSerializer(
+                        "test_protobuf_topic_transform_fake_source",
+                        seaTunnelRowType,
+                        readonlyConfig);
+
+        // Produce records to Kafka
+        sendData(serializer);
+
+        // Execute the job and validate
+        Container.ExecResult execResult = container.executeJob(confFile);
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        try (KafkaConsumer<byte[], byte[]> consumer =
+                new KafkaConsumer<>(kafkaByteConsumerConfig())) {
+            consumer.subscribe(Arrays.asList("verify_protobuf_transform"));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(
+                            Arrays.asList(new 
TopicPartition("verify_protobuf_transform", 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        String data = new String(record.value(), "UTF-8");
+                        ObjectNode jsonNodes = JsonUtils.parseObject(data);
+                        Assertions.assertEquals(jsonNodes.size(), 2);
+                        
Assertions.assertEquals(jsonNodes.get("city").asText(), "city_value");
+                        
Assertions.assertEquals(jsonNodes.get("c_string").asText(), "test data");
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+    }
+
     public static String getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         URL resource = KafkaIT.class.getResource(configFile);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
new file mode 100644
index 0000000000..1a48db8c5f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+    Kafka {
+        topic = "test_protobuf_topic_transform_fake_source"
+        format = protobuf
+        protobuf_message_name = Person
+        protobuf_schema = """
+            syntax = "proto3";
+
+            package org.apache.seatunnel.format.protobuf;
+
+            option java_outer_classname = "ProtobufE2E";
+
+            message Person {
+                int32 c_int32 = 1;
+                int64 c_int64 = 2;
+                float c_float = 3;
+                double c_double = 4;
+                bool c_bool = 5;
+                string c_string = 6;
+                bytes c_bytes = 7;
+
+                message Address {
+                    string street = 1;
+                    string city = 2;
+                    string state = 3;
+                    string zip = 4;
+                }
+
+                Address address = 8;
+
+                map<string, float> attributes = 9;
+
+                repeated string phone_numbers = 10;
+            }
+        """
+        schema = {
+            fields {
+                c_int32 = int
+                c_int64 = long
+                c_float = float
+                c_double = double
+                c_bool = boolean
+                c_string = string
+                c_bytes = bytes
+
+                Address {
+                    city = string
+                    state = string
+                    street = string
+                }
+                attributes = "map<string,float>"
+                phone_numbers = "array<string>"
+            }
+        }
+        bootstrap.servers = "kafkaCluster:9092"
+        start_mode = "earliest"
+        result_table_name = "kafka_table"
+    }
+}
+
+transform {
+    Sql {
+        source_table_name = "kafka_table"
+        result_table_name = "kafka_table_transform"
+        query = "select Address.city,c_string from kafka_table"
+    }
+}
+
+sink {
+  kafka {
+      topic = "verify_protobuf_transform"
+      source_table_name = "kafka_table_transform"
+      bootstrap.servers = "kafkaCluster:9092"
+      kafka.request.timeout.ms = 60000
+      kafka.config = {
+        acks = "all"
+        request.timeout.ms = 60000
+        buffer.memory = 33554432
+      }
+
+  }
+}

Reply via email to