This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ecb9b9e  [test](e2e) add e2e test for multiple transforms chain (#72)
ecb9b9e is described below

commit ecb9b9e63b5716e1a939c9844002bd73943557ec
Author: wangchuang <[email protected]>
AuthorDate: Fri May 9 09:59:35 2025 +0800

    [test](e2e) add e2e test for multiple transforms chain (#72)
---
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 25 ++++++++++++++++++++
 .../e2e/transforms/multiple_transforms_chain.json  | 27 ++++++++++++++++++++++
 .../e2e/transforms/multiple_transforms_chain.sql   | 11 +++++++++
 3 files changed, 63 insertions(+)

diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 9ec116f..6d073c2 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -306,6 +306,31 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expectedResult, query1, 3);
     }
 
+    @Test
+    public void testMultipleTransformsChain() throws Exception {
+        
initialize("src/test/resources/e2e/transforms/multiple_transforms_chain.json");
+        String topic = "multiple_transforms_chain_test";
+        String msg1 = "{\"type\": 
\"INSERT\",\"content\":{\"id\":1,\"old_col1\":\"col1\"}}";
+        String msg2 = "{\"type\": 
\"INSERT\",\"content\":{\"id\":2,\"old_col1\":\"col1\"}}";
+        produceMsg2Kafka(topic, msg1);
+        produceMsg2Kafka(topic, msg2);
+
+        String tableSql1 =
+                
loadContent("src/test/resources/e2e/transforms/multiple_transforms_chain.sql");
+        createTable(tableSql1);
+
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        List<String> expectedResult = Arrays.asList("1,col1", "2,col1");
+        Thread.sleep(10000);
+        String query1 =
+                String.format(
+                        "select id,col1 from %s.%s order by id",
+                        database, "multiple_transforms_chain_tab");
+        checkResult(expectedResult, query1, 2);
+    }
+
     @Test
     public void testTopicMutatingSmt() throws Exception {
         
initialize("src/test/resources/e2e/transforms/regex_router_transforms.json");
diff --git a/src/test/resources/e2e/transforms/multiple_transforms_chain.json 
b/src/test/resources/e2e/transforms/multiple_transforms_chain.json
new file mode 100644
index 0000000..f61e8e5
--- /dev/null
+++ b/src/test/resources/e2e/transforms/multiple_transforms_chain.json
@@ -0,0 +1,27 @@
+{
+  "name":"multiple_transforms_chain_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"multiple_transforms_chain_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": 
"multiple_transforms_chain_test:multiple_transforms_chain_tab",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"11",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"transforms_msg",
+    "load.model":"stream_load",
+    "transforms":"extractField,renameField",
+    "transforms.extractField.type": 
"org.apache.kafka.connect.transforms.ExtractField$Value",
+    "transforms.extractField.field": "content",
+    
"transforms.renameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
+    "transforms.renameField.renames":"old_col1:col1",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/transforms/multiple_transforms_chain.sql 
b/src/test/resources/e2e/transforms/multiple_transforms_chain.sql
new file mode 100644
index 0000000..5536049
--- /dev/null
+++ b/src/test/resources/e2e/transforms/multiple_transforms_chain.sql
@@ -0,0 +1,11 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE transforms_msg.multiple_transforms_chain_tab (
+  id INT NULL,
+  col1 VARCHAR(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to