This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 72bde71602e branch-2.1: (fix)[case] add fast fail to avoid routine
load case hang (#53970)
72bde71602e is described below
commit 72bde71602eeeef1502a356840770bd9e7d557df
Author: MoanasDaddyXu <[email protected]>
AuthorDate: Fri Aug 1 11:26:42 2025 +0800
branch-2.1: (fix)[case] add fast fail to avoid routine load case hang
(#53970)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #53214
---
.../test_multi_table_load_error.groovy | 26 +++++++++++++++++++
.../routine_load/test_out_of_range_error.groovy | 26 +++++++++++++++++++
.../load_p0/routine_load/test_routine_load.groovy | 26 +++++++++++++++++++
.../test_routine_load_condition.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_eof.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_error.groovy | 26 +++++++++++++++++++
.../test_routine_load_error_info.groovy | 28 ++++++++++++++++++++
.../test_routine_load_jsonpath_dollar.groovy | 30 ++++++++++++++++++++++
.../routine_load/test_routine_load_metrics.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_offset.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_property.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_schedule.groovy | 26 +++++++++++++++++++
.../test_routine_load_topic_change.groovy | 26 +++++++++++++++++++
.../routine_load/test_routine_load_with_sc.groovy | 26 +++++++++++++++++++
.../routine_load/test_show_routine_load.groovy | 26 +++++++++++++++++++
15 files changed, 396 insertions(+)
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
index 17a8e5da719..d81bddee120 100644
---
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy
@@ -59,9 +59,35 @@ suite("test_multi_table_load_eror","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
index 1ae74b73301..12205801f92 100644
--- a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
@@ -36,9 +36,35 @@ suite("test_out_of_range","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index c8aef50bc79..b8eb7fa8ae2 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -182,9 +182,35 @@ suite("test_routine_load","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
index 7735867c749..f53a44cb1da 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_condition","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index ac0b08248ef..b2895452110 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -37,9 +37,35 @@ suite("test_routine_load_eof","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
def count = 0
while(true) {
Thread.sleep(1000)
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 825752941d5..7ddac949248 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -40,9 +40,35 @@ suite("test_routine_load_error","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2f018a93729..0b926125434 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -36,7 +36,35 @@ suite("test_routine_load_error_info","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
index fed1236ca2e..8763d61572e 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
@@ -17,6 +17,8 @@
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.ProducerConfig
suite("test_routine_load_jsonpath_dollar", "p0") {
def tableName = "test_routine_load_jsonpath_dollar"
@@ -25,6 +27,7 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// Send test data to Kafka
@@ -32,8 +35,35 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
props.put("bootstrap.servers",
"${externalEnvIp}:${kafka_port}".toString())
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ // Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
def kafkaJson = new
File("""${context.file.parent}/data/${jobName}.json""").text
def lines = kafkaJson.readLines()
lines.each { line ->
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
index bb1afb6dd34..1e262487082 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
@@ -38,9 +38,35 @@ suite("test_routine_load_metrics","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
index 84d0509cea3..ebbd73127dc 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy
@@ -34,9 +34,35 @@ suite("test_routine_load_offset","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
index 9cc1fa0d2d9..40c8f40da34 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_property","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
index c8044ad1404..4c75979d644 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -37,9 +37,35 @@ suite("test_routine_load_schedule","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
index 25bf9933d11..24b8a533b83 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_topic_change","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
index 33c047062dd..5e2dab7b379 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -36,9 +36,35 @@ suite("test_routine_load_with_sc","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
diff --git
a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
index 6075dc20dbe..0a97f84c6e5 100644
--- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy
@@ -34,9 +34,35 @@ suite("test_show_routine_load","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // add timeout config
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ // check conenction
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
// Create kafka producer
def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]