Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package librdkafka for openSUSE:Factory 
checked in at 2023-05-19 11:54:49
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/librdkafka (Old)
 and      /work/SRC/openSUSE:Factory/.librdkafka.new.1533 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "librdkafka"

Fri May 19 11:54:49 2023 rev:17 rq:1087789 version:2.1.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/librdkafka/librdkafka.changes    2023-05-03 
12:56:26.407537367 +0200
+++ /work/SRC/openSUSE:Factory/.librdkafka.new.1533/librdkafka.changes  
2023-05-19 11:54:55.559058705 +0200
@@ -1,0 +2,39 @@
+Sun May  7 18:58:32 UTC 2023 - Dirk Müller <[email protected]>
+
+- update to 2.1.1:
+  * Avoid duplicate messages when a fetch response is received
+  * in the middle of an offset validation request
+  * Fix segmentation fault when subscribing to a non-existent
+    topic and calling `rd_kafka_message_leader_epoch()` on the polled
+    `rkmessage`
+  * Fix a segmentation fault when fetching from follower and the
+    partition lease expires while waiting for the result of a list offsets
+    operation
+  * Fix documentation for the admin request timeout, incorrectly
+    stating -1 for infinite
+  * timeout. That timeout can't be infinite.
+  * Fix CMake pkg-config cURL require and use
+  * pkg-config `Requires.private` field
+  * Fixes certain cases where polling would not keep the consumer
+  * in the group or make it rejoin it
+  * Fix to the C++ set_leader_epoch method of TopicPartitionImpl,
+  * that wasn't storing the passed value
+  * Duplicate messages can be emitted when a fetch response is
+    received in the middle of an offset validation request. Solved by
+    avoiding a restart from last application offset when offset validation
+    succeeds.
+  * When fetching from follower, if the partition lease expires
+    after 5 minutes, and a list offsets operation was requested
+    to retrieve the earliest or latest offset, it resulted in
+    segmentation fault. This was fixed by allowing threads different
+    from the main one to call the `rd_kafka_toppar_set_fetch_state`
+    function, given they hold the lock on the `rktp`.
+  * In v2.1.0, a bug was fixed which caused polling any queue to
+    reset the `max.poll.interval.ms`.
+    Only certain functions were made to reset the timer,
+    but it is possible for the user to obtain the queue with
+    messages from the broker, skipping these functions. This was fixed by
+    encoding information in a queue itself, that, whether polling,
+    resets the timer.
+
+-------------------------------------------------------------------

Old:
----
  v2.1.0.tar.gz

New:
----
  v2.1.1.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ librdkafka.spec ++++++
--- /var/tmp/diff_new_pack.bpsebj/_old  2023-05-19 11:54:58.939078050 +0200
+++ /var/tmp/diff_new_pack.bpsebj/_new  2023-05-19 11:54:58.951078118 +0200
@@ -20,10 +20,10 @@
 # lto breaks crc32 detection in configure script
 # See https://github.com/edenhill/librdkafka/issues/2426
 %ifnarch x86_64
-%define _lto_cflags %{nil}
+%global _lto_cflags %{nil}
 %endif
 Name:           librdkafka
-Version:        2.1.0
+Version:        2.1.1
 Release:        0
 Summary:        The Apache Kafka C/C++ library
 License:        BSD-2-Clause

++++++ v2.1.0.tar.gz -> v2.1.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/.semaphore/semaphore.yml 
new/librdkafka-2.1.1/.semaphore/semaphore.yml
--- old/librdkafka-2.1.0/.semaphore/semaphore.yml       2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/.semaphore/semaphore.yml       2023-04-28 
09:54:05.000000000 +0200
@@ -126,7 +126,7 @@
             - make -j -C tests build
             - make -C tests run_local_quick
             - DESTDIR="$PWD/dest" make install
-            - (cd tests && python3 -m trivup.clusters.KafkaCluster --version 
3.1.0 --cmd 'make quick')
+            - (cd tests && python3 -m trivup.clusters.KafkaCluster --version 
3.4.0 --cmd 'make quick')
 
 
   - name: 'Linux x64: release artifact docker builds'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/CHANGELOG.md 
new/librdkafka-2.1.1/CHANGELOG.md
--- old/librdkafka-2.1.0/CHANGELOG.md   2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/CHANGELOG.md   2023-04-28 09:54:05.000000000 +0200
@@ -1,3 +1,42 @@
+# librdkafka v2.1.1
+
+librdkafka v2.1.1 is a maintenance release:
+
+ * Avoid duplicate messages when a fetch response is received
+   in the middle of an offset validation request (#4261).
+ * Fix segmentation fault when subscribing to a non-existent topic and
+   calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage` (#4245).
+ * Fix a segmentation fault when fetching from follower and the partition lease
+   expires while waiting for the result of a list offsets operation (#4254).
+ * Fix documentation for the admin request timeout, incorrectly stating -1 for 
infinite
+   timeout. That timeout can't be infinite.
+ * Fix CMake pkg-config cURL require and use
+   pkg-config `Requires.private` field (@FantasqueX, @stertingen, #4180).
+ * Fixes certain cases where polling would not keep the consumer
+   in the group or make it rejoin it (#4256).
+ * Fix to the C++ set_leader_epoch method of TopicPartitionImpl,
+   that wasn't storing the passed value (@pavel-pimenov, #4267).
+
+## Fixes
+
+### Consumer fixes
+
+ * Duplicate messages can be emitted when a fetch response is received
+   in the middle of an offset validation request. Solved by avoiding
+   a restart from last application offset when offset validation succeeds.
+ * When fetching from follower, if the partition lease expires after 5 minutes,
+   and a list offsets operation was requested to retrieve the earliest
+   or latest offset, it resulted in segmentation fault. This was fixed by
+   allowing threads different from the main one to call
+   the `rd_kafka_toppar_set_fetch_state` function, given they hold
+   the lock on the `rktp`.
+ * In v2.1.0, a bug was fixed which caused polling any queue to reset the
+   `max.poll.interval.ms`. Only certain functions were made to reset the timer,
+   but it is possible for the user to obtain the queue with messages from
+   the broker, skipping these functions. This was fixed by encoding information
+   in a queue itself, that, whether polling, resets the timer.
+
+
 # librdkafka v2.1.0
 
 librdkafka v2.1.0 is a feature release:
@@ -64,11 +103,18 @@
    any of the **seek**, **pause**, **resume** or **rebalancing** operation, 
`on_consume`
    interceptors might be called incorrectly (maybe multiple times) for not 
consumed messages.
 
+### Consume API
+
+ * Duplicate messages can be emitted when a fetch response is received
+   in the middle of an offset validation request.
+ * Segmentation fault when subscribing to a non-existent topic and
+   calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage`.
+
 
 
 # librdkafka v2.0.2
 
-librdkafka v2.0.2 is a bugfix release:
+librdkafka v2.0.2 is a maintenance release:
 
 * Fix OpenSSL version in Win32 nuget package (#4152).
 
@@ -76,7 +122,7 @@
 
 # librdkafka v2.0.1
 
-librdkafka v2.0.1 is a bugfix release:
+librdkafka v2.0.1 is a maintenance release:
 
 * Fixed nuget package for Linux ARM64 release (#4150).
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/packaging/cmake/rdkafka.pc.in 
new/librdkafka-2.1.1/packaging/cmake/rdkafka.pc.in
--- old/librdkafka-2.1.0/packaging/cmake/rdkafka.pc.in  2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/packaging/cmake/rdkafka.pc.in  2023-04-28 
09:54:05.000000000 +0200
@@ -6,7 +6,7 @@
 Name: @PKG_CONFIG_NAME@
 Description: @PKG_CONFIG_DESCRIPTION@
 Version: @PKG_CONFIG_VERSION@
-Requires: @PKG_CONFIG_REQUIRES@
+Requires.private: @PKG_CONFIG_REQUIRES_PRIVATE@
 Cflags: @PKG_CONFIG_CFLAGS@
 Libs: @PKG_CONFIG_LIBS@
 Libs.private: @PKG_CONFIG_LIBS_PRIVATE@
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/CMakeLists.txt 
new/librdkafka-2.1.1/src/CMakeLists.txt
--- old/librdkafka-2.1.0/src/CMakeLists.txt     2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/CMakeLists.txt     2023-04-28 09:54:05.000000000 
+0200
@@ -199,7 +199,7 @@
 
 if(WITH_CURL)
   find_package(CURL REQUIRED)
-  target_include_directories(rdkafka PUBLIC ${CURL_INCLUDE_DIRS})
+  target_include_directories(rdkafka PRIVATE ${CURL_INCLUDE_DIRS})
   target_link_libraries(rdkafka PUBLIC ${CURL_LIBRARIES})
 endif()
 
@@ -272,7 +272,7 @@
 
 # Generate pkg-config file
 set(PKG_CONFIG_VERSION "${PROJECT_VERSION}")
-set(PKG_CONFIG_REQUIRES "")
+set(PKG_CONFIG_REQUIRES_PRIVATE "")
 if (WIN32)
   set(PKG_CONFIG_LIBS_PRIVATE "-lws2_32 -lsecur32 -lcrypt32")
 else()
@@ -296,27 +296,27 @@
   set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library")
 
   if(WITH_CURL)
-    string(APPEND PKG_CONFIG_REQUIRES "curl ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcurl ")
   endif()
 
   if(WITH_ZLIB)
-    string(APPEND PKG_CONFIG_REQUIRES "zlib ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "zlib ")
   endif()
 
   if(WITH_SSL)
-    string(APPEND PKG_CONFIG_REQUIRES "libssl ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcrypto libssl ")
   endif()
 
   if(WITH_SASL_CYRUS)
-    string(APPEND PKG_CONFIG_REQUIRES "libsasl2 ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libsasl2 ")
   endif()
 
   if(WITH_ZSTD)
-    string(APPEND PKG_CONFIG_REQUIRES "libzstd ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libzstd ")
   endif()
 
   if(WITH_LZ4_EXT)
-    string(APPEND PKG_CONFIG_REQUIRES "liblz4 ")
+    string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "liblz4 ")
   endif()
 
   set(PKG_CONFIG_CFLAGS "-I\${includedir}")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka.c 
new/librdkafka-2.1.1/src/rdkafka.c
--- old/librdkafka-2.1.0/src/rdkafka.c  2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/src/rdkafka.c  2023-04-28 09:54:05.000000000 +0200
@@ -4003,20 +4003,37 @@
 
 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
         int r;
+        const rd_bool_t can_q_contain_fetched_msgs =
+            rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK);
+
+        if (timeout_ms && can_q_contain_fetched_msgs)
+                rd_kafka_app_poll_blocking(rk);
 
         r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
                              rd_kafka_poll_cb, NULL);
 
+        if (can_q_contain_fetched_msgs)
+                rd_kafka_app_polled(rk);
+
         return r;
 }
 
 
 rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
         rd_kafka_op_t *rko;
+        const rd_bool_t can_q_contain_fetched_msgs =
+            rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);
+
+
+        if (timeout_ms && can_q_contain_fetched_msgs)
+                rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
 
         rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
                                    RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, 
NULL);
 
+        if (can_q_contain_fetched_msgs)
+                rd_kafka_app_polled(rkqu->rkqu_rk);
+
         if (!rko)
                 return NULL;
 
@@ -4025,10 +4042,18 @@
 
 int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
         int r;
+        const rd_bool_t can_q_contain_fetched_msgs =
+            rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);
+
+        if (timeout_ms && can_q_contain_fetched_msgs)
+                rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
 
         r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
                              RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
 
+        if (can_q_contain_fetched_msgs)
+                rd_kafka_app_polled(rkqu->rkqu_rk);
+
         return r;
 }
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka.h 
new/librdkafka-2.1.1/src/rdkafka.h
--- old/librdkafka-2.1.0/src/rdkafka.h  2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/src/rdkafka.h  2023-04-28 09:54:05.000000000 +0200
@@ -166,7 +166,7 @@
  * @remark This value should only be used during compile time,
  *         for runtime checks of version use rd_kafka_version()
  */
-#define RD_KAFKA_VERSION 0x020100ff
+#define RD_KAFKA_VERSION 0x020101ff
 
 /**
  * @brief Returns the librdkafka version as integer.
@@ -3431,6 +3431,12 @@
  *
  * @remark rd_kafka_queue_destroy() MUST be called on this queue
  *         prior to calling rd_kafka_consumer_close().
+ * @remark Polling the returned queue counts as a consumer poll, and will reset
+ *         the timer for max.poll.interval.ms. If this queue is forwarded to a
+ *         "destq", polling destq also counts as a consumer poll (this works
+ *         for any number of forwards). However, even if this queue is
+ *         unforwarded or forwarded elsewhere, polling destq will continue
+ *         to count as a consumer poll.
  */
 RD_EXPORT
 rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk);
@@ -6765,8 +6771,7 @@
  *        request transmission, operation time on broker, and response.
  *
  * @param options Admin options.
- * @param timeout_ms Timeout in milliseconds, use -1 for indefinite timeout.
- *                   Defaults to `socket.timeout.ms`.
+ * @param timeout_ms Timeout in milliseconds. Defaults to `socket.timeout.ms`.
  * @param errstr A human readable error string (nul-terminated) is written to
  *               this location that must be of at least \p errstr_size bytes.
  *               The \p errstr is only written in case of error.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_cgrp.c 
new/librdkafka-2.1.1/src/rdkafka_cgrp.c
--- old/librdkafka-2.1.0/src/rdkafka_cgrp.c     2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_cgrp.c     2023-04-28 09:54:05.000000000 
+0200
@@ -415,7 +415,7 @@
         rkcg->rkcg_wait_coord_q             = rd_kafka_q_new(rk);
         rkcg->rkcg_wait_coord_q->rkq_serve  = rkcg->rkcg_ops->rkq_serve;
         rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
-        rkcg->rkcg_q                        = rd_kafka_q_new(rk);
+        rkcg->rkcg_q                        = rd_kafka_consume_q_new(rk);
         rkcg->rkcg_group_instance_id =
             rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_fetcher.c 
new/librdkafka-2.1.1/src/rdkafka_fetcher.c
--- old/librdkafka-2.1.0/src/rdkafka_fetcher.c  2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_fetcher.c  2023-04-28 09:54:05.000000000 
+0200
@@ -975,7 +975,25 @@
         return cnt;
 }
 
-
+/**
+ * @brief Decide whether it should start fetching from next fetch start
+ *        or continue with current fetch pos.
+ *
+ * @param rktp the toppar
+ *
+ * @returns rd_true if it should start fetching from next fetch start,
+ *          rd_false otherwise.
+ *
+ * @locality any
+ * @locks toppar_lock() MUST be held
+ */
+rd_bool_t rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(
+    rd_kafka_toppar_t *rktp) {
+        return rktp->rktp_op_version > rktp->rktp_fetch_version ||
+               rd_kafka_fetch_pos_cmp(&rktp->rktp_next_fetch_start,
+                                      &rktp->rktp_last_next_fetch_start) ||
+               rktp->rktp_offsets.fetch_pos.offset == RD_KAFKA_OFFSET_INVALID;
+}
 
 /**
  * @brief Decide whether this toppar should be on the fetch list or not.
@@ -1037,10 +1055,7 @@
 
         /* Update broker thread's fetch op version */
         version = rktp->rktp_op_version;
-        if (version > rktp->rktp_fetch_version ||
-            rd_kafka_fetch_pos_cmp(&rktp->rktp_next_fetch_start,
-                                   &rktp->rktp_last_next_fetch_start) ||
-            rktp->rktp_offsets.fetch_pos.offset == RD_KAFKA_OFFSET_INVALID) {
+        if (rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(rktp)) {
                 /* New version barrier, something was modified from the
                  * control plane. Reset and start over.
                  * Alternatively only the next_offset changed but not the
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_fetcher.h 
new/librdkafka-2.1.1/src/rdkafka_fetcher.h
--- old/librdkafka-2.1.0/src/rdkafka_fetcher.h  2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_fetcher.h  2023-04-28 09:54:05.000000000 
+0200
@@ -33,6 +33,9 @@
 
 int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now);
 
+rd_bool_t rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(
+    rd_kafka_toppar_t *rktp);
+
 rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp,
                                      rd_kafka_broker_t *rkb,
                                      int force_remove);
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_msg.c 
new/librdkafka-2.1.1/src/rdkafka_msg.c
--- old/librdkafka-2.1.0/src/rdkafka_msg.c      2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_msg.c      2023-04-28 09:54:05.000000000 
+0200
@@ -1562,8 +1562,8 @@
 
 int32_t rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage) {
         rd_kafka_msg_t *rkm;
-
-        if (unlikely(!rkmessage->rkt ||
+        if (unlikely(!rkmessage->rkt || rd_kafka_rkt_is_lw(rkmessage->rkt) ||
+                     !rkmessage->rkt->rkt_rk ||
                      rkmessage->rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER))
                 return -1;
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_offset.c 
new/librdkafka-2.1.1/src/rdkafka_offset.c
--- old/librdkafka-2.1.0/src/rdkafka_offset.c   2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_offset.c   2023-04-28 09:54:05.000000000 
+0200
@@ -643,7 +643,8 @@
         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
         rd_kafka_toppar_t *rktp;
         rd_kafka_resp_err_t err;
-        rd_kafka_fetch_pos_t pos = {offset + 1, -1 /*no leader epoch known*/};
+        rd_kafka_fetch_pos_t pos =
+            RD_KAFKA_FETCH_POS(offset + 1, -1 /*no leader epoch known*/);
 
         /* Find toppar */
         rd_kafka_topic_rdlock(rkt);
@@ -675,7 +676,8 @@
         for (i = 0; i < offsets->cnt; i++) {
                 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
                 rd_kafka_toppar_t *rktp;
-                rd_kafka_fetch_pos_t pos = {rktpar->offset, -1};
+                rd_kafka_fetch_pos_t pos =
+                    RD_KAFKA_FETCH_POS(rktpar->offset, -1);
 
                 rktp =
                     rd_kafka_topic_partition_get_toppar(rk, rktpar, rd_false);
@@ -721,8 +723,8 @@
                                           "Invalid message object, "
                                           "not a consumed message");
 
-        pos.offset       = rkmessage->offset + 1;
-        pos.leader_epoch = rkm->rkm_u.consumer.leader_epoch;
+        pos = RD_KAFKA_FETCH_POS(rkmessage->offset + 1,
+                                 rkm->rkm_u.consumer.leader_epoch);
         err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */,
                                      RD_DO_LOCK);
 
@@ -956,9 +958,6 @@
                                    "supported by broker: validation skipped",
                                    RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                                    rktp->rktp_partition);
-                        /* Reset the epoch to -1 since it can't be used with
-                         * older brokers. */
-                        rktp->rktp_next_fetch_start.leader_epoch = -1;
                         rd_kafka_toppar_set_fetch_state(
                             rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
                         goto done;
@@ -1020,17 +1019,18 @@
 
         if (end_offset < 0 || end_offset_leader_epoch < 0) {
                 rd_kafka_offset_reset(
-                    rktp, rd_kafka_broker_id(rkb), rktp->rktp_next_fetch_start,
+                    rktp, rd_kafka_broker_id(rkb),
+                    rktp->rktp_offset_validation_pos,
                     RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
                     "No epoch found less or equal to "
                     "%s: broker end offset is %" PRId64
                     " (offset leader epoch %" PRId32
                     ")."
                     " Reset using configured policy.",
-                    rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start),
+                    rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos),
                     end_offset, end_offset_leader_epoch);
 
-        } else if (end_offset < rktp->rktp_next_fetch_start.offset) {
+        } else if (end_offset < rktp->rktp_offset_validation_pos.offset) {
 
                 if (rktp->rktp_rkt->rkt_conf.auto_offset_reset ==
                     RD_KAFKA_OFFSET_INVALID /* auto.offset.reset=error */) {
@@ -1044,7 +1044,8 @@
                             " (offset leader epoch %" PRId32
                             "). "
                             "Reset to INVALID.",
-                            
rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start),
+                            rd_kafka_fetch_pos2str(
+                                rktp->rktp_offset_validation_pos),
                             end_offset, end_offset_leader_epoch);
 
                 } else {
@@ -1074,8 +1075,6 @@
                            rktp->rktp_partition, end_offset,
                            end_offset_leader_epoch);
 
-                rktp->rktp_next_fetch_start.leader_epoch =
-                    end_offset_leader_epoch;
                 rd_kafka_toppar_set_fetch_state(rktp,
                                                 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
         }
@@ -1166,7 +1165,7 @@
          * there is no point in doing validation.
          * This is the case for epoch-less seek()s or epoch-less
          * committed offsets. */
-        if (rktp->rktp_next_fetch_start.leader_epoch == -1) {
+        if (rktp->rktp_offset_validation_pos.leader_epoch == -1) {
                 rd_kafka_dbg(
                     rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
                     "%.*s [%" PRId32
@@ -1174,7 +1173,7 @@
                     "validation for %s: no leader epoch set",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
-                    rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start));
+                    rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos));
                 rd_kafka_toppar_set_fetch_state(rktp,
                                                 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
                 return;
@@ -1188,18 +1187,18 @@
         rktpar = rd_kafka_topic_partition_list_add(
             parts, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
         rd_kafka_topic_partition_set_leader_epoch(
-            rktpar, rktp->rktp_next_fetch_start.leader_epoch);
+            rktpar, rktp->rktp_offset_validation_pos.leader_epoch);
         rd_kafka_topic_partition_set_current_leader_epoch(
             rktpar, rktp->rktp_leader_epoch);
         rd_kafka_toppar_keep(rktp); /* for request opaque */
 
-        rd_rkb_dbg(rktp->rktp_leader, FETCH, "VALIDATE",
-                   "%.*s [%" PRId32
-                   "]: querying broker for epoch "
-                   "validation of %s: %s",
-                   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                   rktp->rktp_partition,
-                   rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), 
reason);
+        rd_rkb_dbg(
+            rktp->rktp_leader, FETCH, "VALIDATE",
+            "%.*s [%" PRId32
+            "]: querying broker for epoch "
+            "validation of %s: %s",
+            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition,
+            rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), reason);
 
         rd_kafka_OffsetForLeaderEpochRequest(
             rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_partition.c 
new/librdkafka-2.1.1/src/rdkafka_partition.c
--- old/librdkafka-2.1.0/src/rdkafka_partition.c        2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/src/rdkafka_partition.c        2023-04-28 
09:54:05.000000000 +0200
@@ -243,6 +243,7 @@
         rd_kafka_fetch_pos_init(&rktp->rktp_query_pos);
         rd_kafka_fetch_pos_init(&rktp->rktp_next_fetch_start);
         rd_kafka_fetch_pos_init(&rktp->rktp_last_next_fetch_start);
+        rd_kafka_fetch_pos_init(&rktp->rktp_offset_validation_pos);
         rd_kafka_fetch_pos_init(&rktp->rktp_app_pos);
         rd_kafka_fetch_pos_init(&rktp->rktp_stored_pos);
         rd_kafka_fetch_pos_init(&rktp->rktp_committing_pos);
@@ -252,7 +253,7 @@
         mtx_init(&rktp->rktp_lock, mtx_plain);
 
         rd_refcnt_init(&rktp->rktp_refcnt, 0);
-        rktp->rktp_fetchq          = rd_kafka_q_new(rkt->rkt_rk);
+        rktp->rktp_fetchq          = rd_kafka_consume_q_new(rkt->rkt_rk);
         rktp->rktp_ops             = rd_kafka_q_new(rkt->rkt_rk);
         rktp->rktp_ops->rkq_serve  = rd_kafka_toppar_op_serve;
         rktp->rktp_ops->rkq_opaque = rktp;
@@ -359,9 +360,6 @@
  * @locks_required rd_kafka_toppar_lock() MUST be held.
  */
 void rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t *rktp, int fetch_state) 
{
-        rd_kafka_assert(NULL,
-                        thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
-
         if ((int)rktp->rktp_fetch_state == fetch_state)
                 return;
 
@@ -1798,6 +1796,7 @@
                 rd_kafka_toppar_set_fetch_state(
                     rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
                 rd_kafka_toppar_set_next_fetch_position(rktp, pos);
+                rd_kafka_toppar_set_offset_validation_position(rktp, pos);
                 rd_kafka_offset_validate(rktp, "seek");
         }
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_partition.h 
new/librdkafka-2.1.1/src/rdkafka_partition.h
--- old/librdkafka-2.1.0/src/rdkafka_partition.h        2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/src/rdkafka_partition.h        2023-04-28 
09:54:05.000000000 +0200
@@ -325,6 +325,10 @@
          *  @locality toppar thread */
         rd_kafka_fetch_pos_t rktp_last_next_fetch_start;
 
+        /** The offset to verify.
+         *  @locality toppar thread */
+        rd_kafka_fetch_pos_t rktp_offset_validation_pos;
+
         /** Application's position.
          *  This is the latest offset delivered to application + 1.
          *  It is reset to INVALID_OFFSET when partition is
@@ -1047,7 +1051,7 @@
  * @brief Set's the partitions next fetch position, i.e., the next offset
  *        to start fetching from.
  *
- * @locks_required rd_kafka_toppar_lock(rktp) MUST be held.
+ * @locks rd_kafka_toppar_lock(rktp) MUST be held.
  */
 static RD_UNUSED RD_INLINE void
 rd_kafka_toppar_set_next_fetch_position(rd_kafka_toppar_t *rktp,
@@ -1055,4 +1059,15 @@
         rktp->rktp_next_fetch_start = next_pos;
 }
 
+/**
+ * @brief Sets the offset validation position.
+ *
+ * @locks rd_kafka_toppar_lock(rktp) MUST be held.
+ */
+static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position(
+    rd_kafka_toppar_t *rktp,
+    rd_kafka_fetch_pos_t offset_validation_pos) {
+        rktp->rktp_offset_validation_pos = offset_validation_pos;
+}
+
 #endif /* _RDKAFKA_PARTITION_H_ */
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_queue.c 
new/librdkafka-2.1.1/src/rdkafka_queue.c
--- old/librdkafka-2.1.0/src/rdkafka_queue.c    2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_queue.c    2023-04-28 09:54:05.000000000 
+0200
@@ -83,12 +83,15 @@
  */
 void rd_kafka_q_init0(rd_kafka_q_t *rkq,
                       rd_kafka_t *rk,
+                      rd_bool_t for_consume,
                       const char *func,
                       int line) {
         rd_kafka_q_reset(rkq);
         rkq->rkq_fwdq   = NULL;
         rkq->rkq_refcnt = 1;
         rkq->rkq_flags  = RD_KAFKA_Q_F_READY;
+        if (for_consume)
+                rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;
         rkq->rkq_rk     = rk;
         rkq->rkq_qio    = NULL;
         rkq->rkq_serve  = NULL;
@@ -106,9 +109,15 @@
 /**
  * Allocate a new queue and initialize it.
  */
-rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) {
+rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
+                              rd_bool_t for_consume,
+                              const char *func,
+                              int line) {
         rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
-        rd_kafka_q_init(rkq, rk);
+        if (!for_consume)
+                rd_kafka_q_init(rkq, rk);
+        else
+                rd_kafka_consume_q_init(rkq, rk);
         rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
 #if ENABLE_DEVEL
         rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
@@ -118,6 +127,33 @@
         return rkq;
 }
 
+/*
+ * Sets the flag RD_KAFKA_Q_F_CONSUMER for rkq, any queues it's being forwarded
+ * to, recursively.
+ * Setting this flag indicates that polling this queue is equivalent to calling
+ * consumer poll, and will reset the max.poll.interval.ms timer. Only used
+ * internally when forwarding queues.
+ * @locks rd_kafka_q_lock(rkq)
+ */
+static void rd_kafka_q_consumer_propagate(rd_kafka_q_t *rkq) {
+        mtx_lock(&rkq->rkq_lock);
+        rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER;
+
+        if (!rkq->rkq_fwdq) {
+                mtx_unlock(&rkq->rkq_lock);
+                return;
+        }
+
+        /* Recursively propagate the flag to any queues rkq is already
+         * forwarding to. There will be a deadlock here if the queues are being
+         * forwarded circularly, but that is a user error. We can't resolve 
this
+         * deadlock by unlocking before the recursive call, because that leads
+         * to incorrectness if the rkq_fwdq is forwarded elsewhere and the old
+         * one destroyed between recursive calls. */
+        rd_kafka_q_consumer_propagate(rkq->rkq_fwdq);
+        mtx_unlock(&rkq->rkq_lock);
+}
+
 /**
  * Set/clear forward queue.
  * Queue forwarding enables message routing inside rdkafka.
@@ -152,6 +188,9 @@
                 }
 
                 srcq->rkq_fwdq = destq;
+
+                if (srcq->rkq_flags & RD_KAFKA_Q_F_CONSUMER)
+                        rd_kafka_q_consumer_propagate(destq);
         }
         if (do_lock)
                 mtx_unlock(&srcq->rkq_lock);
@@ -610,6 +649,7 @@
                 rd_kafka_q_destroy(fwdq);
                 return cnt;
         }
+
         mtx_unlock(&rkq->rkq_lock);
 
         if (timeout_ms)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_queue.h 
new/librdkafka-2.1.1/src/rdkafka_queue.h
--- old/librdkafka-2.1.0/src/rdkafka_queue.h    2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_queue.h    2023-04-28 09:54:05.000000000 
+0200
@@ -75,6 +75,11 @@
              * by triggering the cond-var                                      
\
              * but without having to enqueue                                   
\
              * an op. */
+#define RD_KAFKA_Q_F_CONSUMER                                                  
\
+        0x10 /* If this flag is set, this queue might contain fetched messages 
\
+                from partitions. Polling this queue will reset the             
\
+                max.poll.interval.ms timer. Once set, this flag is never       
\
+                reset. */
 
         rd_kafka_t *rkq_rk;
         struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
@@ -123,12 +128,20 @@
 
 void rd_kafka_q_init0(rd_kafka_q_t *rkq,
                       rd_kafka_t *rk,
+                      rd_bool_t for_consume,
                       const char *func,
                       int line);
 #define rd_kafka_q_init(rkq, rk)                                               
\
-        rd_kafka_q_init0(rkq, rk, __FUNCTION__, __LINE__)
-rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line);
-#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, __FUNCTION__, __LINE__)
+        rd_kafka_q_init0(rkq, rk, rd_false, __FUNCTION__, __LINE__)
+#define rd_kafka_consume_q_init(rkq, rk)                                       
\
+        rd_kafka_q_init0(rkq, rk, rd_true, __FUNCTION__, __LINE__)
+rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk,
+                              rd_bool_t for_consume,
+                              const char *func,
+                              int line);
+#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, rd_false, __FUNCTION__, 
__LINE__)
+#define rd_kafka_consume_q_new(rk)                                             
\
+        rd_kafka_q_new0(rk, rd_true, __FUNCTION__, __LINE__)
 void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq);
 
 #define rd_kafka_q_lock(rkqu)   mtx_lock(&(rkqu)->rkq_lock)
@@ -1164,6 +1177,22 @@
         return rko;
 }
 
+/**
+ * @brief Returns true if the queue can contain fetched messages.
+ *
+ * @locks rd_kafka_q_lock(rkq) if do_lock is set.
+ */
+static RD_INLINE RD_UNUSED rd_bool_t
+rd_kafka_q_can_contain_fetched_msgs(rd_kafka_q_t *rkq, rd_bool_t do_lock) {
+        rd_bool_t val;
+        if (do_lock)
+                mtx_lock(&rkq->rkq_lock);
+        val = rkq->rkq_flags & RD_KAFKA_Q_F_CONSUMER;
+        if (do_lock)
+                mtx_unlock(&rkq->rkq_lock);
+        return val;
+}
+
 
 /**@}*/
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_topic.c 
new/librdkafka-2.1.1/src/rdkafka_topic.c
--- old/librdkafka-2.1.0/src/rdkafka_topic.c    2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src/rdkafka_topic.c    2023-04-28 09:54:05.000000000 
+0200
@@ -39,6 +39,7 @@
 #include "rdsysqueue.h"
 #include "rdtime.h"
 #include "rdregex.h"
+#include "rdkafka_fetcher.h"
 
 #if WITH_ZSTD
 #include <zstd.h>
@@ -725,11 +726,16 @@
         }
 
         if (need_epoch_validation) {
-                /* Update next fetch position, that could be stale since last
-                 * fetch start. Only if the app pos is real. */
-                if (rktp->rktp_app_pos.offset > 0) {
-                        rd_kafka_toppar_set_next_fetch_position(
-                            rktp, rktp->rktp_app_pos);
+                /* Set offset validation position,
+                 * depending it if should continue with current position or
+                 * with next fetch start position. */
+                if (rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(
+                        rktp)) {
+                        rd_kafka_toppar_set_offset_validation_position(
+                            rktp, rktp->rktp_next_fetch_start);
+                } else {
+                        rd_kafka_toppar_set_offset_validation_position(
+                            rktp, rktp->rktp_offsets.fetch_pos);
                 }
                 rd_kafka_offset_validate(rktp, "epoch updated from metadata");
         }
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/CMakeLists.txt 
new/librdkafka-2.1.1/src-cpp/CMakeLists.txt
--- old/librdkafka-2.1.0/src-cpp/CMakeLists.txt 2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src-cpp/CMakeLists.txt 2023-04-28 09:54:05.000000000 
+0200
@@ -41,7 +41,7 @@
 if(NOT RDKAFKA_BUILD_STATIC)
   set(PKG_CONFIG_NAME "librdkafka++")
   set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library")
-  set(PKG_CONFIG_REQUIRES "rdkafka")
+  set(PKG_CONFIG_REQUIRES_PRIVATE "rdkafka")
   set(PKG_CONFIG_CFLAGS "-I\${includedir}")
   set(PKG_CONFIG_LIBS "-L\${libdir} -lrdkafka++")
   set(PKG_CONFIG_LIBS_PRIVATE "-lrdkafka")
@@ -57,7 +57,7 @@
 else()
   set(PKG_CONFIG_NAME "librdkafka++-static")
   set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library (static)")
-  set(PKG_CONFIG_REQUIRES "")
+  set(PKG_CONFIG_REQUIRES_PRIVATE "")
   set(PKG_CONFIG_CFLAGS "-I\${includedir} -DLIBRDKAFKA_STATICLIB")
   set(PKG_CONFIG_LIBS "-L\${libdir} \${libdir}/librdkafka++.a")
   if(WIN32)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/rdkafkacpp.h 
new/librdkafka-2.1.1/src-cpp/rdkafkacpp.h
--- old/librdkafka-2.1.0/src-cpp/rdkafkacpp.h   2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/src-cpp/rdkafkacpp.h   2023-04-28 09:54:05.000000000 
+0200
@@ -111,7 +111,7 @@
  * @remark This value should only be used during compile time,
  *         for runtime checks of version use RdKafka::version()
  */
-#define RD_KAFKA_VERSION 0x020100ff
+#define RD_KAFKA_VERSION 0x020101ff
 
 /**
  * @brief Returns the librdkafka version as integer.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h 
new/librdkafka-2.1.1/src-cpp/rdkafkacpp_int.h
--- old/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h       2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/src-cpp/rdkafkacpp_int.h       2023-04-28 
09:54:05.000000000 +0200
@@ -1289,7 +1289,7 @@
   }
 
   void set_leader_epoch(int32_t leader_epoch) {
-    leader_epoch_ = leader_epoch_;
+    leader_epoch_ = leader_epoch;
   }
 
   std::ostream &operator<<(std::ostream &ostrm) const {
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/0033-regex_subscribe.c 
new/librdkafka-2.1.1/tests/0033-regex_subscribe.c
--- old/librdkafka-2.1.0/tests/0033-regex_subscribe.c   2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/tests/0033-regex_subscribe.c   2023-04-28 
09:54:05.000000000 +0200
@@ -174,6 +174,13 @@
                          rkmessage->partition, rkmessage->offset);
 
         } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
+                /* Test segfault associated with this call is solved */
+                int32_t leader_epoch = 
rd_kafka_message_leader_epoch(rkmessage);
+                TEST_ASSERT(leader_epoch == -1,
+                            "rd_kafka_message_leader_epoch should be -1"
+                            ", got %" PRId32,
+                            leader_epoch);
+
                 if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST"))
                         TEST_SAY("%s: %s: error is expected for this topic\n",
                                  rd_kafka_topic_name(rkmessage->rkt),
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/0089-max_poll_interval.c 
new/librdkafka-2.1.1/tests/0089-max_poll_interval.c
--- old/librdkafka-2.1.0/tests/0089-max_poll_interval.c 2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/tests/0089-max_poll_interval.c 2023-04-28 
09:54:05.000000000 +0200
@@ -351,8 +351,95 @@
         SUB_TEST_PASS();
 }
 
+
+/**
+ * @brief Consumer should be able to rejoin the group just by polling after
+ * leaving due to a max.poll.interval.ms timeout. The poll does not need to
+ * go through any special function, any queue containing consumer messages
+ * should suffice.
+ * We test with the result of rd_kafka_queue_get_consumer, and an arbitrary
+ * queue that is forwarded to by the result of rd_kafka_queue_get_consumer.
+ */
+static void
+do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) {
+        const char *topic = test_mk_topic_name("0089_max_poll_interval", 1);
+        rd_kafka_conf_t *conf;
+        char groupid[64];
+        rd_kafka_t *rk                   = NULL;
+        rd_kafka_queue_t *consumer_queue = NULL;
+        rd_kafka_event_t *event          = NULL;
+        rd_kafka_queue_t *polling_queue  = NULL;
+
+        SUB_TEST("Testing with forward_to_another_q = %d",
+                 forward_to_another_q);
+
+        test_create_topic(NULL, topic, 1, 1);
+
+        test_str_id_generate(groupid, sizeof(groupid));
+        test_conf_init(&conf, NULL, 60);
+        test_conf_set(conf, "session.timeout.ms", "6000");
+        test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/);
+        test_conf_set(conf, "partition.assignment.strategy", "range");
+
+        /* We need to specify a non-NULL rebalance CB to get events of type
+         * RD_KAFKA_EVENT_REBALANCE. */
+        rk = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);
+
+        consumer_queue = rd_kafka_queue_get_consumer(rk);
+
+        test_consumer_subscribe(rk, topic);
+
+        if (forward_to_another_q) {
+                polling_queue = rd_kafka_queue_new(rk);
+                rd_kafka_queue_forward(consumer_queue, polling_queue);
+        } else
+                polling_queue = consumer_queue;
+
+        event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
+                                (int)(test_timeout_multiplier * 10000));
+        TEST_ASSERT(event,
+                    "Did not get a rebalance event for initial group join");
+        TEST_ASSERT(rd_kafka_event_error(event) ==
+                        RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+                    "Group join should assign partitions");
+        rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event));
+        rd_kafka_event_destroy(event);
+
+        rd_sleep(10 + 1); /* Exceed max.poll.interval.ms. */
+
+        /* Note that by polling for the group leave, we're also polling the
+         * consumer queue, and hence it should trigger a rejoin. */
+        event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
+                                (int)(test_timeout_multiplier * 10000));
+        TEST_ASSERT(event, "Did not get a rebalance event for the group 
leave");
+        TEST_ASSERT(rd_kafka_event_error(event) ==
+                        RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+                    "Group leave should revoke partitions");
+        rd_kafka_assign(rk, NULL);
+        rd_kafka_event_destroy(event);
+
+        event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE,
+                                (int)(test_timeout_multiplier * 10000));
+        TEST_ASSERT(event, "Should get a rebalance event for the group 
rejoin");
+        TEST_ASSERT(rd_kafka_event_error(event) ==
+                        RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+                    "Group rejoin should assign partitions");
+        rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event));
+        rd_kafka_event_destroy(event);
+
+        if (forward_to_another_q)
+                rd_kafka_queue_destroy(polling_queue);
+        rd_kafka_queue_destroy(consumer_queue);
+        test_consumer_close(rk);
+        rd_kafka_destroy(rk);
+
+        SUB_TEST_PASS();
+}
+
 int main_0089_max_poll_interval(int argc, char **argv) {
         do_test();
         do_test_with_log_queue();
+        do_test_rejoin_after_interval_expire(rd_false);
+        do_test_rejoin_after_interval_expire(rd_true);
         return 0;
 }
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/0139-offset_validation_mock.c 
new/librdkafka-2.1.1/tests/0139-offset_validation_mock.c
--- old/librdkafka-2.1.0/tests/0139-offset_validation_mock.c    1970-01-01 
01:00:00.000000000 +0100
+++ new/librdkafka-2.1.1/tests/0139-offset_validation_mock.c    2023-04-28 
09:54:05.000000000 +0200
@@ -0,0 +1,151 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2023, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#include "../src/rdkafka_proto.h"
+
+
+struct _produce_args {
+        const char *topic;
+        int sleep;
+        rd_kafka_conf_t *conf;
+};
+
+static int produce_concurrent_thread(void *args) {
+        rd_kafka_t *p1;
+        test_curr->exp_dr_err    = RD_KAFKA_RESP_ERR_NO_ERROR;
+        test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
+
+        struct _produce_args *produce_args = args;
+        rd_sleep(produce_args->sleep);
+
+        p1 = test_create_handle(RD_KAFKA_PRODUCER, produce_args->conf);
+        TEST_CALL_ERR__(
+            rd_kafka_producev(p1, RD_KAFKA_V_TOPIC(produce_args->topic),
+                              RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
+        rd_kafka_flush(p1, -1);
+        rd_kafka_destroy(p1);
+        return 0;
+}
+
+/**
+ * @brief Send a produce request in the middle of an offset validation
+ *        and expect that the fetched message is discarded, don't producing
+ *        a duplicate when state becomes active again. See #4249.
+ */
+static void do_test_no_duplicates_during_offset_validation(void) {
+        const char *topic      = test_mk_topic_name(__FUNCTION__, 1);
+        const char *c1_groupid = topic;
+        rd_kafka_t *c1;
+        rd_kafka_conf_t *conf, *conf_producer;
+        const char *bootstraps;
+        rd_kafka_mock_cluster_t *mcluster;
+        int initial_msg_count = 5;
+        thrd_t thrd;
+        struct _produce_args args = RD_ZERO_INIT;
+        uint64_t testid           = test_id_generate();
+
+        SUB_TEST_QUICK();
+
+        mcluster = test_mock_cluster_new(1, &bootstraps);
+        rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
+
+        /* Slow down OffsetForLeaderEpoch so a produce and
+         * subsequent fetch can happen while it's in-flight */
+        rd_kafka_mock_broker_push_request_error_rtts(
+            mcluster, 1, RD_KAFKAP_OffsetForLeaderEpoch, 1,
+            RD_KAFKA_RESP_ERR_NO_ERROR, 5000);
+
+        test_conf_init(&conf_producer, NULL, 60);
+        test_conf_set(conf_producer, "bootstrap.servers", bootstraps);
+
+
+        /* Seed the topic with messages */
+        test_produce_msgs_easy_v(topic, testid, 0, 0, initial_msg_count, 10,
+                                 "bootstrap.servers", bootstraps,
+                                 "batch.num.messages", "1", NULL);
+
+        args.topic = topic;
+        /* Makes that the message is produced while an offset validation
+         * is ongoing */
+        args.sleep = 5;
+        args.conf  = conf_producer;
+        /* Spin up concurrent thread */
+        if (thrd_create(&thrd, produce_concurrent_thread, (void *)&args) !=
+            thrd_success)
+                TEST_FAIL("Failed to create thread");
+
+        test_conf_init(&conf, NULL, 60);
+
+        test_conf_set(conf, "bootstrap.servers", bootstraps);
+        /* Makes that an offset validation happens at the same
+         * time a new message is being produced */
+        test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+        test_conf_set(conf, "auto.offset.reset", "earliest");
+        test_conf_set(conf, "enable.auto.commit", "false");
+        test_conf_set(conf, "enable.auto.offset.store", "false");
+        test_conf_set(conf, "enable.partition.eof", "true");
+
+        c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
+        test_consumer_subscribe(c1, topic);
+
+        /* Consume initial messages */
+        test_consumer_poll("MSG_INIT", c1, testid, 0, 0, initial_msg_count,
+                           NULL);
+        /* EOF after initial messages */
+        test_consumer_poll("MSG_EOF", c1, testid, 1, initial_msg_count, 0,
+                           NULL);
+        /* Concurrent producer message and EOF */
+        test_consumer_poll("MSG_AND_EOF", c1, testid, 1, initial_msg_count, 1,
+                           NULL);
+        /* Only an EOF, not a duplicate message */
+        test_consumer_poll("MSG_EOF2", c1, testid, 1, initial_msg_count, 0,
+                           NULL);
+
+        thrd_join(thrd, NULL);
+
+        rd_kafka_destroy(c1);
+
+        test_mock_cluster_destroy(mcluster);
+
+        TEST_LATER_CHECK();
+        SUB_TEST_PASS();
+}
+
+int main_0139_offset_validation_mock(int argc, char **argv) {
+
+        if (test_needs_auth()) {
+                TEST_SKIP("Mock cluster does not support SSL/SASL\n");
+                return 0;
+        }
+
+        do_test_no_duplicates_during_offset_validation();
+
+        return 0;
+}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/librdkafka-2.1.0/tests/8001-fetch_from_follower_mock_manual.c 
new/librdkafka-2.1.1/tests/8001-fetch_from_follower_mock_manual.c
--- old/librdkafka-2.1.0/tests/8001-fetch_from_follower_mock_manual.c   
1970-01-01 01:00:00.000000000 +0100
+++ new/librdkafka-2.1.1/tests/8001-fetch_from_follower_mock_manual.c   
2023-04-28 09:54:05.000000000 +0200
@@ -0,0 +1,116 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2023, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#include "../src/rdkafka_proto.h"
+
+/**
+ * @brief Test that the #4195 segfault doesn't happen when preferred replica
+ *        lease expires and the rktp is in fetch state
+ *        RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT.
+ */
+static void do_test_fetch_from_follower_offset_retry(void) {
+        const char *bootstraps;
+        rd_kafka_mock_cluster_t *mcluster;
+        rd_kafka_conf_t *conf;
+        rd_kafka_t *c;
+        const char *topic = "test";
+        rd_kafka_topic_partition_t *rktpar;
+        rd_kafka_topic_partition_list_t *seek;
+        int i;
+
+        SUB_TEST_QUICK();
+        test_timeout_set(600);
+
+        mcluster = test_mock_cluster_new(3, &bootstraps);
+        /* Set partition leader to broker 1. */
+        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+        rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);
+
+        test_conf_init(&conf, NULL, 0);
+        test_conf_set(conf, "bootstrap.servers", bootstraps);
+        test_conf_set(conf, "client.rack", "myrack");
+        test_conf_set(conf, "auto.offset.reset", "earliest");
+        test_conf_set(conf, "fetch.error.backoff.ms", "1000");
+        test_conf_set(conf, "fetch.message.max.bytes", "10");
+        test_conf_set(conf, "session.timeout.ms", "600000");
+        test_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000");
+
+        c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+        test_consumer_assign_partition(
+            "do_test_fetch_from_follower_offset_retry", c, topic, 0,
+            RD_KAFKA_OFFSET_INVALID);
+
+        /* Since there are no messages, this poll only waits for metadata, and
+         * then sets the preferred replica after the first fetch request.
+         * Subsequent polls are for waiting up to 5 minutes. */
+        for (i = 0; i < 7; i++) {
+                test_consumer_poll_no_msgs(
+                    "initial metadata and preferred replica set", c, 0, 40000);
+        }
+
+
+        /* Seek to end to trigger ListOffsets */
+        seek           = rd_kafka_topic_partition_list_new(1);
+        rktpar         = rd_kafka_topic_partition_list_add(seek, topic, 0);
+        rktpar->offset = RD_KAFKA_OFFSET_END;
+
+        /* Increase RTT for this ListOffsets */
+        rd_kafka_mock_broker_push_request_error_rtts(
+            mcluster, 2, RD_KAFKAP_ListOffsets, 1, RD_KAFKA_RESP_ERR_NO_ERROR,
+            40 * 1000);
+
+        rd_kafka_seek_partitions(c, seek, -1);
+        rd_kafka_topic_partition_list_destroy(seek);
+
+        /* Wait lease expiry */
+        rd_sleep(50);
+
+        test_consumer_close(c);
+
+        rd_kafka_destroy(c);
+
+        test_mock_cluster_destroy(mcluster);
+
+        SUB_TEST_PASS();
+}
+
+
+int main_8001_fetch_from_follower_mock_manual(int argc, char **argv) {
+
+        if (test_needs_auth()) {
+                TEST_SKIP("Mock cluster does not support SSL/SASL\n");
+                return 0;
+        }
+
+        do_test_fetch_from_follower_offset_retry();
+
+        return 0;
+}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/CMakeLists.txt 
new/librdkafka-2.1.1/tests/CMakeLists.txt
--- old/librdkafka-2.1.0/tests/CMakeLists.txt   2023-04-03 21:45:45.000000000 
+0200
+++ new/librdkafka-2.1.1/tests/CMakeLists.txt   2023-04-28 09:54:05.000000000 
+0200
@@ -129,7 +129,9 @@
     0136-resolve_cb.c
     0137-barrier_batch_consume.c
     0138-admin_mock.c
+    0139-offset_validation_mock.c
     8000-idle.cpp
+    8001-fetch_from_follower_mock_manual.c
     test.c
     testcpp.cpp
     rusage.c
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/Makefile 
new/librdkafka-2.1.1/tests/Makefile
--- old/librdkafka-2.1.0/tests/Makefile 2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/tests/Makefile 2023-04-28 09:54:05.000000000 +0200
@@ -12,7 +12,7 @@
 LDFLAGS += -rdynamic -L../src -L../src-cpp
 
 # Latest Kafka version
-KAFKA_VERSION?=3.1.0
+KAFKA_VERSION?=3.4.0
 # Kafka versions for compatibility tests
 COMPAT_KAFKA_VERSIONS?=0.8.2.2 0.9.0.1 0.11.0.3 1.0.2 2.4.1 2.8.1 
$(KAFKA_VERSION)
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/tests/test.c 
new/librdkafka-2.1.1/tests/test.c
--- old/librdkafka-2.1.0/tests/test.c   2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/tests/test.c   2023-04-28 09:54:05.000000000 +0200
@@ -246,9 +246,12 @@
 _TEST_DECL(0136_resolve_cb);
 _TEST_DECL(0137_barrier_batch_consume);
 _TEST_DECL(0138_admin_mock);
+_TEST_DECL(0139_offset_validation_mock);
+
 
 /* Manual tests */
 _TEST_DECL(8000_idle);
+_TEST_DECL(8001_fetch_from_follower_mock_manual);
 
 
 /* Define test resource usage thresholds if the default limits
@@ -490,9 +493,11 @@
     _TEST(0136_resolve_cb, TEST_F_LOCAL),
     _TEST(0137_barrier_batch_consume, 0),
     _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)),
+    _TEST(0139_offset_validation_mock, 0),
 
     /* Manual tests */
     _TEST(8000_idle, TEST_F_MANUAL),
+    _TEST(8001_fetch_from_follower_mock_manual, TEST_F_MANUAL),
 
     {NULL}};
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/vcpkg.json 
new/librdkafka-2.1.1/vcpkg.json
--- old/librdkafka-2.1.0/vcpkg.json     2023-04-03 21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/vcpkg.json     2023-04-28 09:54:05.000000000 +0200
@@ -1,6 +1,6 @@
 {
     "name": "librdkafka",
-    "version": "2.1.0",
+    "version": "2.1.1",
     "dependencies": [
         {
             "name": "zstd",
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/librdkafka-2.1.0/win32/tests/tests.vcxproj 
new/librdkafka-2.1.1/win32/tests/tests.vcxproj
--- old/librdkafka-2.1.0/win32/tests/tests.vcxproj      2023-04-03 
21:45:45.000000000 +0200
+++ new/librdkafka-2.1.1/win32/tests/tests.vcxproj      2023-04-28 
09:54:05.000000000 +0200
@@ -219,7 +219,9 @@
     <ClCompile Include="..\..\tests\0136-resolve_cb.c" />
     <ClCompile Include="..\..\tests\0137-barrier_batch_consume.c" />
     <ClCompile Include="..\..\tests\0138-admin_mock.c" />
+    <ClCompile Include="..\..\tests\0139-offset_validation_mock.c" />
     <ClCompile Include="..\..\tests\8000-idle.cpp" />
+    <ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
     <ClCompile Include="..\..\tests\test.c" />
     <ClCompile Include="..\..\tests\testcpp.cpp" />
     <ClCompile Include="..\..\tests\rusage.c" />

Reply via email to