This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ecb9b9e [test](e2e) add e2e test for multiple transforms chain (#72)
ecb9b9e is described below
commit ecb9b9e63b5716e1a939c9844002bd73943557ec
Author: wangchuang <[email protected]>
AuthorDate: Fri May 9 09:59:35 2025 +0800
[test](e2e) add e2e test for multiple transforms chain (#72)
---
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 25 ++++++++++++++++++++
.../e2e/transforms/multiple_transforms_chain.json | 27 ++++++++++++++++++++++
.../e2e/transforms/multiple_transforms_chain.sql | 11 +++++++++
3 files changed, 63 insertions(+)
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 9ec116f..6d073c2 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -306,6 +306,31 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expectedResult, query1, 3);
}
+ @Test
+ public void testMultipleTransformsChain() throws Exception {
+
initialize("src/test/resources/e2e/transforms/multiple_transforms_chain.json");
+ String topic = "multiple_transforms_chain_test";
+ String msg1 = "{\"type\":
\"INSERT\",\"content\":{\"id\":1,\"old_col1\":\"col1\"}}";
+ String msg2 = "{\"type\":
\"INSERT\",\"content\":{\"id\":2,\"old_col1\":\"col1\"}}";
+ produceMsg2Kafka(topic, msg1);
+ produceMsg2Kafka(topic, msg2);
+
+ String tableSql1 =
+
loadContent("src/test/resources/e2e/transforms/multiple_transforms_chain.sql");
+ createTable(tableSql1);
+
+ Thread.sleep(2000);
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ List<String> expectedResult = Arrays.asList("1,col1", "2,col1");
+ Thread.sleep(10000);
+ String query1 =
+ String.format(
+ "select id,col1 from %s.%s order by id",
+ database, "multiple_transforms_chain_tab");
+ checkResult(expectedResult, query1, 2);
+ }
+
@Test
public void testTopicMutatingSmt() throws Exception {
initialize("src/test/resources/e2e/transforms/regex_router_transforms.json");
diff --git a/src/test/resources/e2e/transforms/multiple_transforms_chain.json
b/src/test/resources/e2e/transforms/multiple_transforms_chain.json
new file mode 100644
index 0000000..f61e8e5
--- /dev/null
+++ b/src/test/resources/e2e/transforms/multiple_transforms_chain.json
@@ -0,0 +1,27 @@
+{
+ "name":"multiple_transforms_chain_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"multiple_transforms_chain_test",
+ "tasks.max":"1",
+ "doris.topic2table.map":
"multiple_transforms_chain_test:multiple_transforms_chain_tab",
+ "buffer.count.records":"2",
+ "buffer.flush.time":"11",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"transforms_msg",
+ "load.model":"stream_load",
+ "transforms":"extractField,renameField",
+ "transforms.extractField.type":
"org.apache.kafka.connect.transforms.ExtractField$Value",
+ "transforms.extractField.field": "content",
+
"transforms.renameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
+ "transforms.renameField.renames":"old_col1:col1",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/transforms/multiple_transforms_chain.sql
b/src/test/resources/e2e/transforms/multiple_transforms_chain.sql
new file mode 100644
index 0000000..5536049
--- /dev/null
+++ b/src/test/resources/e2e/transforms/multiple_transforms_chain.sql
@@ -0,0 +1,11 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE transforms_msg.multiple_transforms_chain_tab (
+ id INT NULL,
+ col1 VARCHAR(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]