This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f734e8ecc01 branch-3.1: [fix](case)update routine load cases to avoid
kafka case hang #53214 (#53921)
f734e8ecc01 is described below
commit f734e8ecc01d0ae26ea6a050d74422dfbeb7ddde
Author: MoanasDaddyXu <[email protected]>
AuthorDate: Tue Jul 29 08:24:24 2025 +0800
branch-3.1: [fix](case)update routine load cases to avoid kafka case hang
#53214 (#53921)
picked from #53214
---
.../load_p0/routine_load/test_black_list.groovy | 26 +++
.../load_p0/routine_load/test_disable_load.groovy | 26 +++
...test_multi_table_load_data_quality_error.groovy | 25 +++
.../test_multi_table_load_error.groovy | 25 +++
.../routine_load/test_out_of_range_error.groovy | 25 +++
.../test_routin_load_abnormal_job_monitor.groovy | 25 +++
.../routine_load/test_routine_load_alter.groovy | 25 +++
.../routine_load/test_routine_load_eof.groovy | 25 +++
.../routine_load/test_routine_load_error.groovy | 25 +++
.../test_routine_load_error_info.groovy | 26 +++
.../test_routine_load_follower_fe.groovy | 178 +++++++++++++++++++++
.../routine_load/test_routine_load_metrics.groovy | 25 +++
.../routine_load/test_routine_load_offset.groovy | 25 +++
.../routine_load/test_routine_load_progress.groovy | 26 +++
.../routine_load/test_routine_load_property.groovy | 25 +++
.../test_routine_load_restart_fe.groovy | 25 +++
.../routine_load/test_routine_load_schedule.groovy | 25 +++
.../test_routine_load_timeout_value.groovy | 26 +++
.../test_routine_load_topic_change.groovy | 25 +++
.../routine_load/test_routine_load_with_sc.groovy | 25 +++
.../routine_load/test_routine_load_with_udf.groovy | 25 +++
.../test_routine_load_with_user.groovy | 25 +++
.../routine_load/test_show_routine_load.groovy | 25 +++
23 files changed, 733 insertions(+)
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index 04779f10362..29af1600932 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -34,7 +34,33 @@ suite("test_black_list","nonConcurrent,p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
index 022a9ae92d0..38651775ab0 100644
--- a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
@@ -34,7 +34,33 @@ suite("test_disable_load","nonConcurrent,p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
index 549dbdf3f59..fbd334a1455 100644
---
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
@@ -34,8 +34,33 @@ suite("test_multi_table_load_data_quality_error","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
index 039e734466e..7ffa6efc149 100644
---
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -37,8 +37,33 @@ suite("test_multi_table_load_eror","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
index 1ae74b73301..c16a1b0dae0 100644
--- a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
@@ -36,8 +36,33 @@ suite("test_out_of_range","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
index 7cc08b5b813..4cc3a743bec 100644
---
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
index 0089aff61e5..b1d418180eb 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_alter","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index ac0b08248ef..b200cb22e54 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -37,8 +37,33 @@ suite("test_routine_load_eof","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
def count = 0
while(true) {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 844d4e5a183..d103dc5eb0f 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -40,8 +40,33 @@ suite("test_routine_load_error","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2f018a93729..7394d72c41d 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -36,7 +36,33 @@ suite("test_routine_load_error_info","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
new file mode 100644
index 00000000000..8f7ed9a4cf6
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_follower_fe","docker") {
+ def options = new ClusterOptions()
+ // Configure 3 FE nodes cluster
+ options.setFeNum(3)
+ options.setBeNum(1)
+
+ docker(options) {
+ def kafkaCsvTpoics = [
+ "test_routine_load_follower_fe",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // 1. send data to kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
+ // Send test data to kafka topic
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ // Create simple test data
+ def testData = [
+ "1,test_data_1,2023-01-01,value1,2023-01-01
10:00:00,extra1",
+ "2,test_data_2,2023-01-02,value2,2023-01-02
11:00:00,extra2",
+ "3,test_data_3,2023-01-03,value3,2023-01-03
12:00:00,extra3",
+ "4,test_data_4,2023-01-04,value4,2023-01-04
13:00:00,extra4",
+ "5,test_data_5,2023-01-05,value5,2023-01-05
14:00:00,extra5"
+ ]
+
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+ producer.close()
+
+ // 3. Connect to a follower FE and create table
+ def masterFe = cluster.getMasterFe()
+ def allFes = cluster.getAllFrontends()
+ def followerFes = allFes.findAll { fe -> fe.index !=
masterFe.index }
+ def followerFe = followerFes[0]
+ logger.info("Master FE: ${masterFe.host}")
+ logger.info("Using follower FE: ${followerFe.host}")
+ // Connect to follower FE
+ def url = String.format(
+
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
+ followerFe.host, followerFe.queryPort)
+ logger.info("Connecting to follower FE: ${url}")
+ context.connectTo(url, context.config.jdbcUser,
context.config.jdbcPassword)
+
+ sql "drop database if exists test_routine_load_follower_fe"
+ sql "create database test_routine_load_follower_fe"
+ sql "use test_routine_load_follower_fe"
+ def tableName = "test_routine_load_follower_fe"
+ def job = "test_follower_routine_load"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default:
1");
+ """
+
+ try {
+ // 4. Create routine load job on follower FE
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "20",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.group.id" = "test-follower-consumer-group",
+ "property.client.id" = "test-follower-client-id",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // 5. Wait for routine load to process data
+ def count = 0
+ def maxWaitCount = 60 // Wait up to 60 seconds
+ while (count < maxWaitCount) {
+ def state = sql "show routine load for ${job}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+
+ def rowCount = sql "select count(*) from ${tableName}"
+ // Check if routine load is running and has processed some
data
+ if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+ break
+ }
+
+ sleep(1000)
+ count++
+ }
+ } catch (Exception e) {
+ logger.error("Test failed with exception: ${e.message}")
+ } finally {
+ try {
+ sql "stop routine load for ${job}"
+ } catch (Exception e) {
+ logger.warn("Failed to stop routine load job:
${e.message}")
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
index bb1afb6dd34..33b5166a9a8 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_metrics","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
index 84d0509cea3..1280d3dbe4c 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_offset","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
index c372c5826b2..a353b4da7b1 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
@@ -40,7 +40,33 @@ suite("test_routine_load_progress","docker") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
index 9cc1fa0d2d9..cd10af8dfe5 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_property","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
index 104026fb16e..cedbf998e47 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_restart_fe", "docker") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
index d0f01fc3d04..2bb08c149e5 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -37,8 +37,33 @@ suite("test_routine_load_schedule","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
index 873b9458572..937dba4424a 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
@@ -36,7 +36,33 @@ suite("test_routine_load_timeout_value","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
index 25bf9933d11..09a1f970e2f 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_topic_change","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
index 33c047062dd..02220a2d1b8 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_with_sc","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
index 9f4ae866e99..9bbb7ee208d 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_with_udf","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
index 3611e1cc0d6..754c5d82aae 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
@@ -39,8 +39,33 @@ suite("test_routine_load_with_user","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index d6b31db11f9..ac3d14ffe37 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -34,8 +34,33 @@ suite("test_show_routine_load","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]