This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new df3a8545dc [fix](routine_load) Add retry mechanism for routine load
task which encounter Broker transport failure (#9067)
df3a8545dc is described below
commit df3a8545dc1729523bf35e61f8c8e8856b438164
Author: caiconghui <[email protected]>
AuthorDate: Wed Apr 20 14:49:58 2022 +0800
[fix](routine_load) Add retry mechanism for routine load task which
encounter Broker transport failure (#9067)
---
be/src/runtime/routine_load/data_consumer.cpp | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 2cf330f0b5..33934bbd1d 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -75,6 +75,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
+ RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
+ RETURN_IF_ERROR(set_conf("reconnect.backoff.jitter.ms", "100"));
RETURN_IF_ERROR(set_conf("api.version.request", "true"));
RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
RETURN_IF_ERROR(set_conf("broker.version.fallback",
config::kafka_broker_version_fallback));
@@ -173,12 +175,14 @@ Status KafkaDataConsumer::assign_topic_partitions(
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>*
queue,
int64_t max_running_time_ms) {
+ static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
int64_t left_time = max_running_time_ms;
LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
<< ", max running time(ms): " << left_time;
int64_t received_rows = 0;
int64_t put_rows = 0;
+ int32_t retry_times = 0;
Status st = Status::OK();
MonotonicStopWatch consumer_watch;
MonotonicStopWatch watch;
@@ -220,6 +224,12 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
// if there is no data in kafka.
LOG(INFO) << "kafka consume timeout: " << _id;
break;
+ case RdKafka::ERR__TRANSPORT:
+ LOG(INFO) << "kafka consume Disconnected: " << _id << ", retry
times: " << retry_times++;
+ if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ break;
+ }
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
msg->errstr();
done = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]