This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 802ba712ac9 [fix](third party) fix hang when destroy of rdkafka
instances (#44913)
802ba712ac9 is described below
commit 802ba712ac9a8190539356bbfd294e103199e40b
Author: hui lai <[email protected]>
AuthorDate: Tue Dec 3 21:34:49 2024 +0800
[fix](third party) fix hang when destroy of rdkafka instances (#44913)
Related PR: https://github.com/confluentinc/librdkafka/pull/4724
---
thirdparty/patches/librdkafka-1.9.2.patch | 111 +++++++++++++++++++++++++++++-
1 file changed, 110 insertions(+), 1 deletion(-)
diff --git a/thirdparty/patches/librdkafka-1.9.2.patch
b/thirdparty/patches/librdkafka-1.9.2.patch
index b13e740bc5c..3caac08f79d 100644
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ b/thirdparty/patches/librdkafka-1.9.2.patch
@@ -67,7 +67,19 @@
--- src/rdkafka_broker.c
+++ src/rdkafka_broker.c
-@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
+@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb,
rd_kafka_op_t *rko) {
+ : (topic_err
+ ? topic_err
+ :
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
++
++ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
++
rd_kafka_toppar_purge_internal_fetch_queue_maybe(
++ rktp);
++ }
+ }
+
+ rd_kafka_toppar_unlock(rktp);
+@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
*/
void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
@@ -78,3 +90,100 @@
rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
+--- src/rdkafka_cgrp.c
++++ src/rdkafka_cgrp.c
+@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t
*rkcg,
+ rd_kafka_toppar_lock(rktp);
+ rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
+ rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
++
++ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
++
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_list_remove(&rkcg->rkcg_toppars, rktp);
+--- src/rdkafka_partition.c
++++ src/rdkafka_partition.c
+@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
+ rd_kafka_toppar_unlock(rktp);
+ }
+
++/**
++ * @brief Purge internal fetch queue if toppar is stopped
++ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
++ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
++ * removed starting from a metadata response and stopped from a rebalance or a
++ * consumer close.
++ *
++ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
++ * toppar that stop destroying a consumer.
++ *
++ * @locks rd_kafka_toppar_lock() MUST be held
++ */
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t
*rktp) {
++ rd_kafka_q_t *rkq;
++ rkq = rktp->rktp_fetchq;
++ mtx_lock(&rkq->rkq_lock);
++ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
++ !rktp->rktp_fetchq->rkq_fwdq) {
++ rd_kafka_op_t *rko;
++ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
++
++ /* Partition is being removed from the cluster and it's
stopped,
++ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
++ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
++ * while holding lock, to avoid circular references */
++ rko = TAILQ_FIRST(&rkq->rkq_q);
++ while (rko) {
++ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
++ rko->rko_type != RD_KAFKA_OP_FETCH) {
++ rd_kafka_log(
++ rktp->rktp_rkt->rkt_rk, LOG_WARNING,
++ "PARTDEL",
++ "Purging toppar fetch queue buffer op"
++ "with unexpected type: %s",
++ rd_kafka_op2str(rko->rko_type));
++ }
++
++ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
++ barrier_cnt++;
++ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
++ message_cnt++;
++ else
++ other_cnt++;
+
++ rko = TAILQ_NEXT(rko, rko_link);
++ cnt++;
++ }
++
++ if (cnt) {
++ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++ "Purge toppar fetch queue buffer "
++ "containing %d op(s) "
++ "(%d barrier(s), %d message(s), %d
other)"
++ " to avoid "
++ "circular references",
++ cnt, barrier_cnt, message_cnt,
other_cnt);
++ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
++ } else {
++ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++ "Not purging toppar fetch queue buffer."
++ " No ops present in the buffer.");
++ }
++ }
++ mtx_unlock(&rkq->rkq_lock);
++}
+
+ /**
+ * Helper method for purging queues when removing a toppar.
+--- src/rdkafka_partition.h
++++ src/rdkafka_partition.h
+@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t
*rktp,
+ int64_t query_offset,
+ int backoff_ms);
+
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t
*rktp);
++
+ int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
+ int purge_flags,
+ rd_bool_t include_xmit_msgq);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]