This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bfb9ba871b2 [test](routine load) add routine load doc case (#43230)
bfb9ba871b2 is described below
commit bfb9ba871b2e710b449717e4b431ff25977951e2
Author: hui lai <[email protected]>
AuthorDate: Wed Nov 6 23:43:02 2024 +0800
[test](routine load) add routine load doc case (#43230)
Add routine load doc case.
---
.../import/import-way/routine-load-manual.md.out | 123 +++
.../import/import-way/test_rl_array.json | 3 +
.../import/import-way/test_rl_bitmap.json | 3 +
.../import/import-way/test_rl_column_mapping.csv | 3 +
.../data-operate/import/import-way/test_rl_csv.csv | 10 +
.../import/import-way/test_rl_delete.csv | 2 +
.../data-operate/import/import-way/test_rl_hll.csv | 8 +
.../import/import-way/test_rl_json.json | 3 +
.../import/import-way/test_rl_json_path.json | 3 +
.../import/import-way/test_rl_json_root.json | 3 +
.../import/import-way/test_rl_map.json | 3 +
.../import/import-way/test_rl_max_filter_ratio.csv | 3 +
.../import/import-way/test_rl_partition.csv | 3 +
.../import-way/routine-load-manual.md.groovy | 1019 ++++++++++++++++++++
14 files changed, 1189 insertions(+)
diff --git
a/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
b/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
new file mode 100644
index 00000000000..4c0c1a9e6b7
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/routine-load-manual.md.out
@@ -0,0 +1,123 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+1 Emily 25
+2 Benjamin 35
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql4 --
+1 Benjamin 18
+2 Emily 20
+
+-- !sql5 --
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql6 --
+1 Emily 25
+2 Benjamin 35
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql7 --
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql8 --
+1 Benjamin 18 2024-02-04T10:00
+
+-- !sql9 --
+1 Benjamin 18 2024-02-04T10:00
+2 Emily 20 2024-02-05T11:00
+
+-- !sql10 --
+1 Emily 25
+2 Benjamin 35
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql11 --
+1 Emily 25
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+
+-- !sql12 --
+1 Benjamin 18 180
+2 Emily 20 200
+3 Alexander 22 220
+
+-- !sql13 --
+1 Benjamin 18
+2 Emily 20
+3 Alexander 22
+
+-- !sql14 --
+1 Benjamin 18 180
+2 Emily 20 200
+3 Alexander 22 220
+
+-- !sql15 --
+1 Benjamin 18
+2 Emily 20
+3 Alexander 22
+
+-- !sql16 --
+1 Benjamin 18 [1, 2, 3, 4, 5]
+2 Emily 20 [6, 7, 8, 9, 10]
+3 Alexander 22 [11, 12, 13, 14, 15]
+
+-- !sql17 --
+1 Benjamin 18 {"a":100, "b":200}
+2 Emily 20 {"c":300, "d":400}
+3 Alexander 22 {"e":500, "f":600}
+
+-- !sql18 --
+1 Benjamin 18 \N \N
+2 Emily 20 \N \N
+3 Alexander 22 \N \N
+
+-- !sql19 --
+2022-05-05 10001 Test01 Beijing windows \N
+2022-05-06 10001 Test01 Shanghai windows \N
+2022-05-05 10002 Test01 Beijing linux \N
+2022-05-06 10002 Test01 Shanghai linux \N
+2022-05-05 10003 Test01 Beijing macos \N
+2022-05-06 10003 Test01 Jiangsu macos \N
+2022-05-05 10004 Test01 Hebei windows \N
+2022-05-06 10004 Test01 Shaanxi windows \N
+
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json
new file mode 100644
index 00000000000..8260cad6b77
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_array.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "array":[1,2,3,4,5]}
+{ "id" : 2, "name" : "Emily", "age":20, "array":[6,7,8,9,10]}
+{ "id" : 3, "name" : "Alexander", "age":22, "array":[11,12,13,14,15]}
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json
new file mode 100644
index 00000000000..73bb59becb4
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_bitmap.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "bitmap_id":243}
+{ "id" : 2, "name" : "Emily", "age":20, "bitmap_id":28574}
+{ "id" : 3, "name" : "Alexander", "age":22, "bitmap_id":8573}
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
new file mode 100644
index 00000000000..7eb3f2c7998
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_column_mapping.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18
+2,Emily,20
+3,Alexander,22
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv
new file mode 100644
index 00000000000..9e401297ab2
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_csv.csv
@@ -0,0 +1,10 @@
+1,Emily,25
+2,Benjamin,35
+3,Olivia,28
+4,Alexander,60
+5,Ava,17
+6,William,69
+7,Sophia,32
+8,James,64
+9,Emma,37
+10,Liam,64
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv
new file mode 100644
index 00000000000..11d1c8468f2
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_delete.csv
@@ -0,0 +1,2 @@
+3,Alexander,22
+5,William,26
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv
new file mode 100644
index 00000000000..a7d55471221
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_hll.csv
@@ -0,0 +1,8 @@
+2022-05-05,10001,Test01,Beijing,windows
+2022-05-05,10002,Test01,Beijing,linux
+2022-05-05,10003,Test01,Beijing,macos
+2022-05-05,10004,Test01,Hebei,windows
+2022-05-06,10001,Test01,Shanghai,windows
+2022-05-06,10002,Test01,Shanghai,linux
+2022-05-06,10003,Test01,Jiangsu,macos
+2022-05-06,10004,Test01,Shaanxi,windows
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json
new file mode 100644
index 00000000000..6e06fe67521
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_json.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18 }
+{ "id" : 2, "name" : "Emily", "age":20 }
+{ "id" : 3, "name" : "Alexander", "age":22 }
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
new file mode 100644
index 00000000000..f96e2cd232b
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_path.json
@@ -0,0 +1,3 @@
+{ "name" : "Benjamin", "id" : 1, "num":180 , "age":18 }
+{ "name" : "Emily", "id" : 2, "num":200 , "age":20 }
+{ "name" : "Alexander", "id" : 3, "num":220 , "age":22 }
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
new file mode 100644
index 00000000000..60df381670c
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_json_root.json
@@ -0,0 +1,3 @@
+{"id": 1231, "source" :{ "id" : 1, "name" : "Benjamin", "age":18 }}
+{"id": 1232, "source" :{ "id" : 2, "name" : "Emily", "age":20 }}
+{"id": 1233, "source" :{ "id" : 3, "name" : "Alexander", "age":22 }}
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json
b/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json
new file mode 100644
index 00000000000..d8768982495
--- /dev/null
+++ b/regression-test/data/doc/data-operate/import/import-way/test_rl_map.json
@@ -0,0 +1,3 @@
+{ "id" : 1, "name" : "Benjamin", "age":18, "map":{"a": 100, "b": 200}}
+{ "id" : 2, "name" : "Emily", "age":20, "map":{"c": 300, "d": 400}}
+{ "id" : 3, "name" : "Alexander", "age":22, "map":{"e": 500, "f": 600}}
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
new file mode 100644
index 00000000000..1e07d97228d
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_max_filter_ratio.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18
+2,Emily,20
+3,Alexander,dirty_data
\ No newline at end of file
diff --git
a/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv
b/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv
new file mode 100644
index 00000000000..f41efd837e1
--- /dev/null
+++
b/regression-test/data/doc/data-operate/import/import-way/test_rl_partition.csv
@@ -0,0 +1,3 @@
+1,Benjamin,18,2024-02-04 10:00:00
+2,Emily,20,2024-02-05 11:00:00
+3,Alexander,22,2024-02-06 12:00:00
\ No newline at end of file
diff --git
a/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
b/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
new file mode 100644
index 00000000000..b4327a4cb83
--- /dev/null
+++
b/regression-test/suites/doc/data-operate/import/import-way/routine-load-manual.md.groovy
@@ -0,0 +1,1019 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_doc_case","p0") {
+ def kafkaCsvTopics = [
+ "test_rl_csv",
+ "test_rl_max_filter_ratio",
+ "test_rl_partition",
+ "test_rl_delete",
+ "test_rl_column_mapping",
+ "test_rl_hll"
+ ]
+
+ def kafkaJsonTopics = [
+ "test_rl_json",
+ "test_rl_json_path",
+ "test_rl_json_root",
+ "test_rl_array",
+ "test_rl_map",
+ "test_rl_bitmap"
+ ]
+
+ def jsonpaths = [
+ '[\"$.id\",\"$.name\",\"$.age\"]',
+ '[\"$.name\",\"$.id\",\"$.num\",\"$.age\"]',
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTopics) {
+ def txt = new
File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ for (String kafkaJsonTopic in kafkaJsonTopics) {
+ def kafkaJson = new
File("""${context.config.dataPath}/doc/data-operate/import/import-way/${kafkaJsonTopic}.json""").text
+ def lines = kafkaJson.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaJsonTopic, null, line)
+ producer.send(record)
+ }
+ }
+
+ // case1: load csv
+ def tableName = "test_routine_load_doc_case"
+ def jobName = "test_routine_load_doc_case_job"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ user_id BIGINT NOT NULL COMMENT "用户 ID",
+ name VARCHAR(20) COMMENT "用户姓名",
+ age INT COMMENT "用户年龄"
+ )
+ UNIQUE KEY(user_id)
+ DISTRIBUTED BY HASH(user_id) BUCKETS 10
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(user_id, name, age)
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql1 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case2: load json
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(user_id,name,age)
+ PROPERTIES(
+ "format"="json",
+ "jsonpaths"='${jsonpaths[0]}'
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case3: alter routine load
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(user_id, name, age)
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "pause routine load for ${jobName}"
+ sql """
+ ALTER ROUTINE LOAD FOR ${jobName}
+ PROPERTIES(
+ "desired_concurrent_number" = "3"
+ )
+ FROM KAFKA(
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "test-topic"
+ );
+ """
+ sql "stop routine load for ${jobName}"
+
+ //case4: max_filter_ratio
+ def tableName1 = "test_routine_load_doc_case1"
+ sql """ DROP TABLE IF EXISTS ${tableName1} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ id INT NOT NULL COMMENT "User ID",
+ name VARCHAR(30) NOT NULL COMMENT "Name",
+ age INT COMMENT "Age"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName1}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_filter_ratio"="0.5",
+ "max_error_number" = "100",
+ "strict_mode" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[1]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName1}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ log.info("url: ${state[0][18].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql4 "select * from ${tableName1} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName1} """
+ }
+
+ //case5: kafka_offsets
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "kafka_partitions" = "0",
+ "kafka_offsets" = "3"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql5 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case6: group.id and client.id
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.group.id" = "kafka_job03",
+ "property.client.id" = "kafka_client_03",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql6 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case7: filter
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ WHERE user_id >= 3
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql7 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case8: Loading specified partition data
+ def tableName2 = "test_routine_load_doc_case2"
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ id INT NOT NULL COMMENT "User ID",
+ name VARCHAR(30) NOT NULL COMMENT "Name",
+ age INT COMMENT "Age",
+ date DATETIME COMMENT "Date"
+ )
+ DUPLICATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (PARTITION partition_a VALUES [("0"), ("1")),
+ PARTITION partition_b VALUES [("1"), ("2")),
+ PARTITION partition_c VALUES [("2"), ("3")))
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
+ COLUMNS TERMINATED BY ",",
+ PARTITION(partition_b)
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[2]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName2}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql8 "select * from ${tableName2} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName2} """
+ }
+
+ // case9: timezone
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName2}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "timezone" = "Asia/Shanghai"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[2]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName2}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql9 "select * from ${tableName2} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName2} """
+ }
+
+ // case10: merge delete
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(user_id, name, age)
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ sql "stop routine load for ${jobName}"
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ WITH DELETE
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[3]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql10 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case11: delete on
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ WITH MERGE
+ COLUMNS TERMINATED BY ",",
+ DELETE ON user_id = 2
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql11 "select * from ${tableName} order by user_id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case12: Load with column mapping and derived column calculation
+ tableName = "test_routine_load_doc_case3"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age",
+ num INT COMMENT "number"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(id, name, age, num=age*10)
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[4]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql12 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case13: test json
+ tableName = "routine_test12"
+ jobName = "kafka_job12"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ PROPERTIES
+ (
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql13 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case14: json path
+ tableName = "routine_test13"
+ jobName = "kafka_job13"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age",
+ num INT COMMENT "num"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(name, id, num, age)
+ PROPERTIES
+ (
+ "format" = "json",
+ "jsonpaths"='${jsonpaths[1]}'
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[1]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql14 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case15: json root
+ tableName = "routine_test14"
+ jobName = "kafka_job14"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ PROPERTIES
+ (
+ "format" = "json",
+ "json_root" = "\$.source"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[2]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql15 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case16: array
+ tableName = "routine_test16"
+ jobName = "kafka_job16"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age",
+ array ARRAY<int(11)> NULL COMMENT "test array column"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ PROPERTIES
+ (
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[3]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql16 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case17: map
+ tableName = "routine_test17"
+ jobName = "kafka_job17"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age",
+ map Map<STRING, INT> NULL COMMENT "test column"
+ )
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ PROPERTIES
+ (
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[4]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql17 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case18: bitmap
+ tableName = "routine_test18"
+ jobName = "kafka_job18"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id INT NOT NULL COMMENT "id",
+ name VARCHAR(30) NOT NULL COMMENT "name",
+ age INT COMMENT "age",
+ bitmap_id INT COMMENT "test",
+ device_id BITMAP BITMAP_UNION COMMENT "test column"
+ )
+ AGGREGATE KEY (`id`,`name`,`age`,`bitmap_id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(id, name, age, bitmap_id,
device_id=to_bitmap(bitmap_id))
+ PROPERTIES
+ (
+ "format" = "json"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaJsonTopics[4]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql18 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ // case19: hll
+ tableName = "routine_test19"
+ jobName = "kafka_job19"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ dt DATE,
+ id INT,
+ name VARCHAR(10),
+ province VARCHAR(10),
+ os VARCHAR(10),
+ pv hll hll_union
+ )
+ Aggregate KEY (dt,id,name,province,os)
+ distributed by hash(id) buckets 10
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(dt, id, name, province, os, pv=hll_hash(id))
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[5]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ log.info("url: ${state[0][18].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql19 "select * from ${tableName} order by id"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ truncate table ${tableName} """
+ }
+
+ //case 19: Single-task Loading to Multiple Tables
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[5]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+
+ //case20: strict mode
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ PROPERTIES
+ (
+ "strict_mode" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTopics[5]}"
+ );
+ """
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]