This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f734e8ecc01 branch-3.1: [fix](case)update routine load cases to avoid 
kafka case hang #53214 (#53921)
f734e8ecc01 is described below

commit f734e8ecc01d0ae26ea6a050d74422dfbeb7ddde
Author: MoanasDaddyXu <[email protected]>
AuthorDate: Tue Jul 29 08:24:24 2025 +0800

    branch-3.1: [fix](case)update routine load cases to avoid kafka case hang 
#53214 (#53921)
    
    picked from #53214
---
 .../load_p0/routine_load/test_black_list.groovy    |  26 +++
 .../load_p0/routine_load/test_disable_load.groovy  |  26 +++
 ...test_multi_table_load_data_quality_error.groovy |  25 +++
 .../test_multi_table_load_error.groovy             |  25 +++
 .../routine_load/test_out_of_range_error.groovy    |  25 +++
 .../test_routin_load_abnormal_job_monitor.groovy   |  25 +++
 .../routine_load/test_routine_load_alter.groovy    |  25 +++
 .../routine_load/test_routine_load_eof.groovy      |  25 +++
 .../routine_load/test_routine_load_error.groovy    |  25 +++
 .../test_routine_load_error_info.groovy            |  26 +++
 .../test_routine_load_follower_fe.groovy           | 178 +++++++++++++++++++++
 .../routine_load/test_routine_load_metrics.groovy  |  25 +++
 .../routine_load/test_routine_load_offset.groovy   |  25 +++
 .../routine_load/test_routine_load_progress.groovy |  26 +++
 .../routine_load/test_routine_load_property.groovy |  25 +++
 .../test_routine_load_restart_fe.groovy            |  25 +++
 .../routine_load/test_routine_load_schedule.groovy |  25 +++
 .../test_routine_load_timeout_value.groovy         |  26 +++
 .../test_routine_load_topic_change.groovy          |  25 +++
 .../routine_load/test_routine_load_with_sc.groovy  |  25 +++
 .../routine_load/test_routine_load_with_udf.groovy |  25 +++
 .../test_routine_load_with_user.groovy             |  25 +++
 .../routine_load/test_show_routine_load.groovy     |  25 +++
 23 files changed, 733 insertions(+)

diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy 
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index 04779f10362..29af1600932 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -34,7 +34,33 @@ suite("test_black_list","nonConcurrent,p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
index 022a9ae92d0..38651775ab0 100644
--- a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
@@ -34,7 +34,33 @@ suite("test_disable_load","nonConcurrent,p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
index 549dbdf3f59..fbd334a1455 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
@@ -34,8 +34,33 @@ suite("test_multi_table_load_data_quality_error","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
index 039e734466e..7ffa6efc149 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -37,8 +37,33 @@ suite("test_multi_table_load_eror","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
index 1ae74b73301..c16a1b0dae0 100644
--- a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
@@ -36,8 +36,33 @@ suite("test_out_of_range","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
index 7cc08b5b813..4cc3a743bec 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
index 0089aff61e5..b1d418180eb 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_alter","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index ac0b08248ef..b200cb22e54 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -37,8 +37,33 @@ suite("test_routine_load_eof","nonConcurrent") {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            // add timeout config
+            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+            // check conenction
+            def verifyKafkaConnection = { prod ->
+                try {
+                    logger.info("=====try to connect Kafka========")
+                    def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                    return partitions != null
+                } catch (Exception e) {
+                    throw new Exception("Kafka connect fail: 
${e.message}".toString())
+                }
+            }
             // Create kafka producer
             def producer = new KafkaProducer<>(props)
+            try {
+                logger.info("Kafka connecting: ${kafka_broker}")
+                if (!verifyKafkaConnection(producer)) {
+                    throw new Exception("can't get any kafka info")
+                }
+            } catch (Exception e) {
+                logger.error("FATAL: " + e.getMessage())
+                producer.close()
+                throw e  
+            }
+            logger.info("Kafka connect success")
 
             def count = 0
             while(true) {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 844d4e5a183..d103dc5eb0f 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -40,8 +40,33 @@ suite("test_routine_load_error","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2f018a93729..7394d72c41d 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -36,7 +36,33 @@ suite("test_routine_load_error_info","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
new file mode 100644
index 00000000000..8f7ed9a4cf6
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_follower_fe","docker") {
+    def options = new ClusterOptions()
+    // Configure 3 FE nodes cluster
+    options.setFeNum(3)
+    options.setBeNum(1)
+
+    docker(options) {
+        def kafkaCsvTpoics = [
+                  "test_routine_load_follower_fe",
+                ]
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        def kafka_broker = "${externalEnvIp}:${kafka_port}"
+        
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            // 1. send data to kafka
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            // add timeout config
+            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+            // check conenction
+            def verifyKafkaConnection = { prod ->
+                try {
+                    logger.info("=====try to connect Kafka========")
+                    def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                    return partitions != null
+                } catch (Exception e) {
+                    throw new Exception("Kafka connect fail: 
${e.message}".toString())
+                }
+            }
+            // Create kafka producer
+            def producer = new KafkaProducer<>(props)
+            try {
+                logger.info("Kafka connecting: ${kafka_broker}")
+                if (!verifyKafkaConnection(producer)) {
+                    throw new Exception("can't get any kafka info")
+                }
+            } catch (Exception e) {
+                logger.error("FATAL: " + e.getMessage())
+                producer.close()
+                throw e  
+            }
+            logger.info("Kafka connect success")
+            
+            // Send test data to kafka topic
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                // Create simple test data
+                def testData = [
+                    "1,test_data_1,2023-01-01,value1,2023-01-01 
10:00:00,extra1",
+                    "2,test_data_2,2023-01-02,value2,2023-01-02 
11:00:00,extra2",
+                    "3,test_data_3,2023-01-03,value3,2023-01-03 
12:00:00,extra3",
+                    "4,test_data_4,2023-01-04,value4,2023-01-04 
13:00:00,extra4",
+                    "5,test_data_5,2023-01-05,value5,2023-01-05 
14:00:00,extra5"
+                ]
+                
+                testData.each { line ->
+                    logger.info("Sending data to kafka: ${line}")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                    producer.send(record)
+                }
+            }
+            producer.close()
+            
+            // 3. Connect to a follower FE and create table
+            def masterFe = cluster.getMasterFe()
+            def allFes = cluster.getAllFrontends()
+            def followerFes = allFes.findAll { fe -> fe.index != 
masterFe.index }
+            def followerFe = followerFes[0]
+            logger.info("Master FE: ${masterFe.host}")
+            logger.info("Using follower FE: ${followerFe.host}")
+            // Connect to follower FE
+            def url = String.format(
+                    
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
+                    followerFe.host, followerFe.queryPort)
+            logger.info("Connecting to follower FE: ${url}")
+            context.connectTo(url, context.config.jdbcUser, 
context.config.jdbcPassword)
+
+            sql "drop database if exists test_routine_load_follower_fe"
+            sql "create database test_routine_load_follower_fe"
+            sql "use test_routine_load_follower_fe"
+            def tableName = "test_routine_load_follower_fe"
+            def job = "test_follower_routine_load"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k1` int(20) NULL,
+                    `k2` string NULL,
+                    `v1` date  NULL,
+                    `v2` string  NULL,
+                    `v3` datetime  NULL,
+                    `v4` string  NULL
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k1`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+                PROPERTIES ("replication_allocation" = "tag.location.default: 
1");
+            """
+
+            try {
+                // 4. Create routine load job on follower FE
+                sql """
+                    CREATE ROUTINE LOAD ${job} ON ${tableName}
+                    COLUMNS TERMINATED BY ","
+                    PROPERTIES
+                    (
+                        "max_batch_interval" = "20",
+                        "max_batch_rows" = "300000",
+                        "max_batch_size" = "209715200"
+                    )
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${kafka_broker}",
+                        "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                        "property.group.id" = "test-follower-consumer-group",
+                        "property.client.id" = "test-follower-client-id",
+                        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    );
+                """
+
+                // 5. Wait for routine load to process data
+                def count = 0
+                def maxWaitCount = 60 // Wait up to 60 seconds
+                while (count < maxWaitCount) {
+                    def state = sql "show routine load for ${job}"
+                    def routineLoadState = state[0][8].toString()
+                    def statistic = state[0][14].toString()
+                    logger.info("Routine load state: ${routineLoadState}")
+                    logger.info("Routine load statistic: ${statistic}")
+                    
+                    def rowCount = sql "select count(*) from ${tableName}"
+                    // Check if routine load is running and has processed some 
data
+                    if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+                        break
+                    }
+                    
+                    sleep(1000)
+                    count++
+                }
+            } catch (Exception e) {
+                logger.error("Test failed with exception: ${e.message}")
+            } finally {
+                try {
+                    sql "stop routine load for ${job}"
+                } catch (Exception e) {
+                    logger.warn("Failed to stop routine load job: 
${e.message}")
+                }
+            }
+        }
+    }
+} 
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
index bb1afb6dd34..33b5166a9a8 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_metrics","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
index 84d0509cea3..1280d3dbe4c 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_offset","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
index c372c5826b2..a353b4da7b1 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
@@ -40,7 +40,33 @@ suite("test_routine_load_progress","docker") {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            // add timeout config
+            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+            // check conenction
+            def verifyKafkaConnection = { prod ->
+                try {
+                    logger.info("=====try to connect Kafka========")
+                    def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                    return partitions != null
+                } catch (Exception e) {
+                    throw new Exception("Kafka connect fail: 
${e.message}".toString())
+                }
+            }
+            // Create kafka producer
             def producer = new KafkaProducer<>(props)
+            try {
+                logger.info("Kafka connecting: ${kafka_broker}")
+                if (!verifyKafkaConnection(producer)) {
+                    throw new Exception("can't get any kafka info")
+                }
+            } catch (Exception e) {
+                logger.error("FATAL: " + e.getMessage())
+                producer.close()
+                throw e  
+            }
+            logger.info("Kafka connect success")
             for (String kafkaCsvTopic in kafkaCsvTpoics) {
                 def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
                 def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
index 9cc1fa0d2d9..cd10af8dfe5 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_property","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
index 104026fb16e..cedbf998e47 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_restart_fe.groovy
@@ -38,8 +38,33 @@ suite("test_routine_load_restart_fe", "docker") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
index d0f01fc3d04..2bb08c149e5 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -37,8 +37,33 @@ suite("test_routine_load_schedule","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
index 873b9458572..937dba4424a 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_timeout_value.groovy
@@ -36,7 +36,33 @@ suite("test_routine_load_timeout_value","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
index 25bf9933d11..09a1f970e2f 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_topic_change","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
index 33c047062dd..02220a2d1b8 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -36,8 +36,33 @@ suite("test_routine_load_with_sc","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
index 9f4ae866e99..9bbb7ee208d 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
@@ -34,8 +34,33 @@ suite("test_routine_load_with_udf","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
index 3611e1cc0d6..754c5d82aae 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy
@@ -39,8 +39,33 @@ suite("test_routine_load_with_user","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
diff --git 
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index d6b31db11f9..ac3d14ffe37 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -34,8 +34,33 @@ suite("test_show_routine_load","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
 
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to