This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2b4efc76fc0 [fix](routine-load) fix consumer hang when kafka exception
causing can not query (#33492) (#33759)
2b4efc76fc0 is described below
commit 2b4efc76fc043dfbb43c5b06ad7b9d85937978e4
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Apr 18 20:22:49 2024 +0800
[fix](routine-load) fix consumer hang when kafka exception causing can not
query (#33492) (#33759)
---
thirdparty/patches/librdkafka-1.9.2.patch | 31 +++++++++++++++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git a/thirdparty/patches/librdkafka-1.9.2.patch
b/thirdparty/patches/librdkafka-1.9.2.patch
index 38064e751dc..b13e740bc5c 100644
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ b/thirdparty/patches/librdkafka-1.9.2.patch
@@ -34,6 +34,37 @@
# Clear define name ($2): caller may have additional checks
mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
return 1
+--- src/rdkafka.c
++++ src/rdkafka.c
+@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+ struct rd_kafka_partition_leader *leader;
+ rd_list_t leaders;
+ rd_kafka_resp_err_t err;
++ int tmout;
+
+ partitions = rd_kafka_topic_partition_list_new(1);
+ rktpar =
+@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+ rd_list_destroy(&leaders);
+
+ /* Wait for reply (or timeout) */
+- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
+- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
+- rd_kafka_poll_cb,
+- NULL) != RD_KAFKA_OP_RES_YIELD)
+- ;
++ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
++ tmout = rd_timeout_remains(ts_end);
++ if (rd_timeout_expired(tmout)) {
++ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
++ break;
++ }
++ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
++ rd_kafka_poll_cb, NULL);
++ }
+
+ rd_kafka_q_destroy_owner(rkq);
+
--- src/rdkafka_broker.c
+++ src/rdkafka_broker.c
@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]