This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 24cab5e5133 branch-3.1: [regression](load) add a case for using the
'$.' symbol in routine load #53442 (#53602)
24cab5e5133 is described below
commit 24cab5e5133acbb9bfc443712a0e8b46efa2e7d9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jul 21 15:01:17 2025 +0800
branch-3.1: [regression](load) add a case for using the '$.' symbol in
routine load #53442 (#53602)
Cherry-picked from #53442
Co-authored-by: lw112 <[email protected]>
---
.../test_routine_load_jsonpath_dollar_job.json | 3 +
.../test_routine_load_jsonpath_dollar.groovy | 159 +++++++++++++++++++++
2 files changed, 162 insertions(+)
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
new file mode 100644
index 00000000000..86a8a9b2bcf
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
@@ -0,0 +1,3 @@
+{"time": 1752600673, "id": 1, "name": "test1", "extra": "field1"}
+{"time": 1752600674, "id": 2, "name": "test2", "extra": "field2"}
+{"time": 1752600675, "id": 3, "name": "test3", "extra": "field3"}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
new file mode 100644
index 00000000000..a4ade95f27d
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_jsonpath_dollar", "p0") {
+ def tableName = "test_routine_load_jsonpath_dollar"
+ def jobName = "test_routine_load_jsonpath_dollar_job"
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // Send test data to Kafka
+ def props = new Properties()
+ props.put("bootstrap.servers",
"${externalEnvIp}:${kafka_port}".toString())
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+
+ def kafkaJson = new
File("""${context.file.parent}/data/${jobName}.json""").text
+ def lines = kafkaJson.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(jobName, null, line)
+ producer.send(record)
+ }
+ producer.close()
+
+ try {
+ sql """
+ DROP TABLE IF EXISTS ${tableName}
+ """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ time DATETIME,
+ id INT,
+ name VARCHAR(50),
+ content TEXT
+ )
+ UNIQUE KEY(time, id)
+ DISTRIBUTED BY HASH(time, id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // Create routine load job with $. in jsonpaths
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(ot,time=from_unixtime(`ot`), id, name, content),
+ PRECEDING FILTER ((`ot` > 0) AND (`id` != ''))
+ PROPERTIES
+ (
+ "format" = "json",
+ "jsonpaths" = '[\"\$.time\", \"\$.id\", \"\$.name\",
\"\$.\"]',
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200",
+ "max_error_number" = "0",
+ "strip_outer_array" = "false",
+ "strict_mode" = "false"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${jobName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "sync"
+
+ // Wait for routine load to be in RUNNING state
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ log.info("routine load state: ${state}")
+ if (state == "RUNNING") {
+ break
+ }
+ if (count >= 60) {
+ log.error("routine load failed to start after 60 seconds")
+ assertEquals("RUNNING", state)
+ break
+ }
+ count++
+ }
+
+ // Wait for data to be loaded
+ count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state: ${state[0][8].toString()}")
+ log.info("routine load statistic: ${state[0][14].toString()}")
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 60) {
+ log.error("routine load can not load data for long time")
+ break
+ }
+ sleep(5000)
+ count++
+ }
+
+ sql "sync"
+ def result = sql "select * from ${tableName} order by time, id"
+ log.info("Loaded data: ${result}")
+
+ def rowCount = sql "select count(*) from ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def contentCheck = sql "select content from ${tableName} where id
= 1"
+ assertTrue(contentCheck.size() > 0, "No data found for id = 1")
+ def jsonContent = contentCheck[0][0].toString()
+ assertTrue(jsonContent.contains("test1"), "Content should contain
the full JSON with 'test1'")
+ assertTrue(jsonContent.contains("field1"), "Content should contain
the full JSON with 'field1'")
+ assertTrue(jsonContent.contains("time"), "Content should contain
the full JSON with 'time' field")
+
+ def specificData = sql "select date_format(time,
'%Y-%m-%dT%H:%i:%s'), id, name from ${tableName} where id = 1"
+ assertEquals("2025-07-16T01:31:13", specificData[0][0])
+ assertEquals(1, specificData[0][1])
+ assertEquals("test1", specificData[0][2])
+ } finally {
+ try {
+ sql "stop routine load for ${jobName}"
+ } catch (Exception e) {
+ log.info("Stop routine load failed: ${e.getMessage()}")
+ }
+
+ try {
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ } catch (Exception e) {
+ log.info("Drop table failed: ${e.getMessage()}")
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]