This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 5cba77ba856d19c40466159e45638a9db510c43f Author: caiconghui <[email protected]> AuthorDate: Wed Apr 20 14:49:58 2022 +0800 [fix](routine_load) Add retry mechanism for routine load task which encounter Broker transport failure (#9067) --- be/src/runtime/routine_load/data_consumer.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 2cf330f0b5..33934bbd1d 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -75,6 +75,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); + RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true")); + RETURN_IF_ERROR(set_conf("reconnect.backoff.jitter.ms", "100")); RETURN_IF_ERROR(set_conf("api.version.request", "true")); RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0")); RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback)); @@ -173,12 +175,14 @@ Status KafkaDataConsumer::assign_topic_partitions( Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms) { + static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3; int64_t left_time = max_running_time_ms; LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id << ", max running time(ms): " << left_time; int64_t received_rows = 0; int64_t put_rows = 0; + int32_t retry_times = 0; Status st = Status::OK(); MonotonicStopWatch consumer_watch; MonotonicStopWatch watch; @@ -220,6 +224,12 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, // if there is no data in kafka. LOG(INFO) << "kafka consume timeout: " << _id; break; + case RdKafka::ERR__TRANSPORT: + LOG(INFO) << "kafka consume Disconnected: " << _id << ", retry times: " << retry_times++; + if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + break; + } default: LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); done = true; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
