This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 72bde71602e branch-2.1: (fix)[case] add fast fail to avoid routine 
load case hang (#53970)
72bde71602e is described below

commit 72bde71602eeeef1502a356840770bd9e7d557df
Author: MoanasDaddyXu <[email protected]>
AuthorDate: Fri Aug 1 11:26:42 2025 +0800

    branch-2.1: (fix)[case] add fast fail to avoid routine load case hang 
(#53970)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #53214
---
 .../test_multi_table_load_error.groovy             | 26 +++++++++++++++++++
 .../routine_load/test_out_of_range_error.groovy    | 26 +++++++++++++++++++
 .../load_p0/routine_load/test_routine_load.groovy  | 26 +++++++++++++++++++
 .../test_routine_load_condition.groovy             | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_eof.groovy      | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_error.groovy    | 26 +++++++++++++++++++
 .../test_routine_load_error_info.groovy            | 28 ++++++++++++++++++++
 .../test_routine_load_jsonpath_dollar.groovy       | 30 ++++++++++++++++++++++
 .../routine_load/test_routine_load_metrics.groovy  | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_offset.groovy   | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_property.groovy | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_schedule.groovy | 26 +++++++++++++++++++
 .../test_routine_load_topic_change.groovy          | 26 +++++++++++++++++++
 .../routine_load/test_routine_load_with_sc.groovy  | 26 +++++++++++++++++++
 .../routine_load/test_show_routine_load.groovy     | 26 +++++++++++++++++++
 15 files changed, 396 insertions(+)

diff --git 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
index 17a8e5da719..d81bddee120 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -59,9 +59,35 @@ suite("test_multi_table_load_eror","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
index 1ae74b73301..12205801f92 100644
--- a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
@@ -36,9 +36,35 @@ suite("test_out_of_range","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index c8aef50bc79..b8eb7fa8ae2 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -182,9 +182,35 @@ suite("test_routine_load","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
index 7735867c749..f53a44cb1da 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_condition","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index ac0b08248ef..b2895452110 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -37,9 +37,35 @@ suite("test_routine_load_eof","nonConcurrent") {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
             props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            // add timeout config
+            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+            // check conenction
+            def verifyKafkaConnection = { prod ->
+                try {
+                    logger.info("=====try to connect Kafka========")
+                    def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                    return partitions != null
+                } catch (Exception e) {
+                    throw new Exception("Kafka connect fail: 
${e.message}".toString())
+                }
+            }
             // Create kafka producer
             def producer = new KafkaProducer<>(props)
 
+            try {
+                logger.info("Kafka connecting: ${kafka_broker}")
+                if (!verifyKafkaConnection(producer)) {
+                    throw new Exception("can't get any kafka info")
+                }
+            } catch (Exception e) {
+                logger.error("FATAL: " + e.getMessage())
+                producer.close()
+                throw e  
+            }
+            logger.info("Kafka connect success")
+
             def count = 0
             while(true) {
                 Thread.sleep(1000)
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 825752941d5..7ddac949248 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -40,9 +40,35 @@ suite("test_routine_load_error","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2f018a93729..0b926125434 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -36,7 +36,35 @@ suite("test_routine_load_error_info","nonConcurrent") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
+
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+        
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
index fed1236ca2e..8763d61572e 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
@@ -17,6 +17,8 @@
 
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.ProducerConfig
 
 suite("test_routine_load_jsonpath_dollar", "p0") {
     def tableName = "test_routine_load_jsonpath_dollar"
@@ -25,6 +27,7 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
     String enabled = context.config.otherConfigs.get("enableKafkaTest")
     String kafka_port = context.config.otherConfigs.get("kafka_port")
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
 
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         // Send test data to Kafka
@@ -32,8 +35,35 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
         props.put("bootstrap.servers", 
"${externalEnvIp}:${kafka_port}".toString())
         props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         def kafkaJson = new 
File("""${context.file.parent}/data/${jobName}.json""").text
         def lines = kafkaJson.readLines()
         lines.each { line ->
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
index bb1afb6dd34..1e262487082 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
@@ -38,9 +38,35 @@ suite("test_routine_load_metrics","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
index 84d0509cea3..ebbd73127dc 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
@@ -34,9 +34,35 @@ suite("test_routine_load_offset","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
index 9cc1fa0d2d9..40c8f40da34 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_property","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
index c8044ad1404..4c75979d644 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -37,9 +37,35 @@ suite("test_routine_load_schedule","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
index 25bf9933d11..24b8a533b83 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_topic_change","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
index 33c047062dd..5e2dab7b379 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_with_sc","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()
diff --git 
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index 6075dc20dbe..0a97f84c6e5 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -34,9 +34,35 @@ suite("test_show_routine_load","p0") {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // add timeout config
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        // check conenction
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
         // Create kafka producer
         def producer = new KafkaProducer<>(props)
 
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+
         for (String kafkaCsvTopic in kafkaCsvTpoics) {
             def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
             def lines = txt.readLines()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to