Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package librdkafka for openSUSE:Factory checked in at 2023-05-19 11:54:49 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/librdkafka (Old) and /work/SRC/openSUSE:Factory/.librdkafka.new.1533 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "librdkafka" Fri May 19 11:54:49 2023 rev:17 rq:1087789 version:2.1.1 Changes: -------- --- /work/SRC/openSUSE:Factory/librdkafka/librdkafka.changes 2023-05-03 12:56:26.407537367 +0200 +++ /work/SRC/openSUSE:Factory/.librdkafka.new.1533/librdkafka.changes 2023-05-19 11:54:55.559058705 +0200 @@ -1,0 +2,39 @@ +Sun May 7 18:58:32 UTC 2023 - Dirk Müller <[email protected]> + +- update to 2.1.1: + * Avoid duplicate messages when a fetch response is received + * in the middle of an offset validation request + * Fix segmentation fault when subscribing to a non-existent + topic and calling `rd_kafka_message_leader_epoch()` on the polled + `rkmessage` + * Fix a segmentation fault when fetching from follower and the + partition lease expires while waiting for the result of a list offsets + operation + * Fix documentation for the admin request timeout, incorrectly + stating -1 for infinite + * timeout. That timeout can't be infinite. + * Fix CMake pkg-config cURL require and use + * pkg-config `Requires.private` field + * Fixes certain cases where polling would not keep the consumer + * in the group or make it rejoin it + * Fix to the C++ set_leader_epoch method of TopicPartitionImpl, + * that wasn't storing the passed value + * Duplicate messages can be emitted when a fetch response is + received in the middle of an offset validation request. Solved by + avoiding a restart from last application offset when offset validation + succeeds. + * When fetching from follower, if the partition lease expires + after 5 minutes, and a list offsets operation was requested + to retrieve the earliest or latest offset, it resulted in + segmentation fault. This was fixed by allowing threads different + from the main one to call the `rd_kafka_toppar_set_fetch_state` + function, given they hold the lock on the `rktp`. + * In v2.1.0, a bug was fixed which caused polling any queue to + reset the `max.poll.interval.ms`. + Only certain functions were made to reset the timer, + but it is possible for the user to obtain the queue with + messages from the broker, skipping these functions. This was fixed by + encoding information in a queue itself, that, whether polling, + resets the timer. + +------------------------------------------------------------------- Old: ---- v2.1.0.tar.gz New: ---- v2.1.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ librdkafka.spec ++++++ --- /var/tmp/diff_new_pack.bpsebj/_old 2023-05-19 11:54:58.939078050 +0200 +++ /var/tmp/diff_new_pack.bpsebj/_new 2023-05-19 11:54:58.951078118 +0200 @@ -20,10 +20,10 @@ # lto breaks crc32 detection in configure script # See https://github.com/edenhill/librdkafka/issues/2426 %ifnarch x86_64 -%define _lto_cflags %{nil} +%global _lto_cflags %{nil} %endif Name: librdkafka -Version: 2.1.0 +Version: 2.1.1 Release: 0 Summary: The Apache Kafka C/C++ library License: BSD-2-Clause ++++++ v2.1.0.tar.gz -> v2.1.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/.semaphore/semaphore.yml new/librdkafka-2.1.1/.semaphore/semaphore.yml --- old/librdkafka-2.1.0/.semaphore/semaphore.yml 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/.semaphore/semaphore.yml 2023-04-28 09:54:05.000000000 +0200 @@ -126,7 +126,7 @@ - make -j -C tests build - make -C tests run_local_quick - DESTDIR="$PWD/dest" make install - - (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.1.0 --cmd 'make quick') + - (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.4.0 --cmd 'make quick') - name: 'Linux x64: release artifact docker builds' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/CHANGELOG.md new/librdkafka-2.1.1/CHANGELOG.md --- old/librdkafka-2.1.0/CHANGELOG.md 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/CHANGELOG.md 2023-04-28 09:54:05.000000000 +0200 @@ -1,3 +1,42 @@ +# librdkafka v2.1.1 + +librdkafka v2.1.1 is a maintenance release: + + * Avoid duplicate messages when a fetch response is received + in the middle of an offset validation request (#4261). + * Fix segmentation fault when subscribing to a non-existent topic and + calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage` (#4245). + * Fix a segmentation fault when fetching from follower and the partition lease + expires while waiting for the result of a list offsets operation (#4254). + * Fix documentation for the admin request timeout, incorrectly stating -1 for infinite + timeout. That timeout can't be infinite. + * Fix CMake pkg-config cURL require and use + pkg-config `Requires.private` field (@FantasqueX, @stertingen, #4180). + * Fixes certain cases where polling would not keep the consumer + in the group or make it rejoin it (#4256). + * Fix to the C++ set_leader_epoch method of TopicPartitionImpl, + that wasn't storing the passed value (@pavel-pimenov, #4267). + +## Fixes + +### Consumer fixes + + * Duplicate messages can be emitted when a fetch response is received + in the middle of an offset validation request. Solved by avoiding + a restart from last application offset when offset validation succeeds. + * When fetching from follower, if the partition lease expires after 5 minutes, + and a list offsets operation was requested to retrieve the earliest + or latest offset, it resulted in segmentation fault. This was fixed by + allowing threads different from the main one to call + the `rd_kafka_toppar_set_fetch_state` function, given they hold + the lock on the `rktp`. + * In v2.1.0, a bug was fixed which caused polling any queue to reset the + `max.poll.interval.ms`. Only certain functions were made to reset the timer, + but it is possible for the user to obtain the queue with messages from + the broker, skipping these functions. This was fixed by encoding information + in a queue itself, that, whether polling, resets the timer. + + # librdkafka v2.1.0 librdkafka v2.1.0 is a feature release: @@ -64,11 +103,18 @@ any of the **seek**, **pause**, **resume** or **rebalancing** operation, `on_consume` interceptors might be called incorrectly (maybe multiple times) for not consumed messages. +### Consume API + + * Duplicate messages can be emitted when a fetch response is received + in the middle of an offset validation request. + * Segmentation fault when subscribing to a non-existent topic and + calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage`. + # librdkafka v2.0.2 -librdkafka v2.0.2 is a bugfix release: +librdkafka v2.0.2 is a maintenance release: * Fix OpenSSL version in Win32 nuget package (#4152). @@ -76,7 +122,7 @@ # librdkafka v2.0.1 -librdkafka v2.0.1 is a bugfix release: +librdkafka v2.0.1 is a maintenance release: * Fixed nuget package for Linux ARM64 release (#4150). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/packaging/cmake/rdkafka.pc.in new/librdkafka-2.1.1/packaging/cmake/rdkafka.pc.in --- old/librdkafka-2.1.0/packaging/cmake/rdkafka.pc.in 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/packaging/cmake/rdkafka.pc.in 2023-04-28 09:54:05.000000000 +0200 @@ -6,7 +6,7 @@ Name: @PKG_CONFIG_NAME@ Description: @PKG_CONFIG_DESCRIPTION@ Version: @PKG_CONFIG_VERSION@ -Requires: @PKG_CONFIG_REQUIRES@ +Requires.private: @PKG_CONFIG_REQUIRES_PRIVATE@ Cflags: @PKG_CONFIG_CFLAGS@ Libs: @PKG_CONFIG_LIBS@ Libs.private: @PKG_CONFIG_LIBS_PRIVATE@ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/CMakeLists.txt new/librdkafka-2.1.1/src/CMakeLists.txt --- old/librdkafka-2.1.0/src/CMakeLists.txt 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/CMakeLists.txt 2023-04-28 09:54:05.000000000 +0200 @@ -199,7 +199,7 @@ if(WITH_CURL) find_package(CURL REQUIRED) - target_include_directories(rdkafka PUBLIC ${CURL_INCLUDE_DIRS}) + target_include_directories(rdkafka PRIVATE ${CURL_INCLUDE_DIRS}) target_link_libraries(rdkafka PUBLIC ${CURL_LIBRARIES}) endif() @@ -272,7 +272,7 @@ # Generate pkg-config file set(PKG_CONFIG_VERSION "${PROJECT_VERSION}") -set(PKG_CONFIG_REQUIRES "") +set(PKG_CONFIG_REQUIRES_PRIVATE "") if (WIN32) set(PKG_CONFIG_LIBS_PRIVATE "-lws2_32 -lsecur32 -lcrypt32") else() @@ -296,27 +296,27 @@ set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library") if(WITH_CURL) - string(APPEND PKG_CONFIG_REQUIRES "curl ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcurl ") endif() if(WITH_ZLIB) - string(APPEND PKG_CONFIG_REQUIRES "zlib ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "zlib ") endif() if(WITH_SSL) - string(APPEND PKG_CONFIG_REQUIRES "libssl ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcrypto libssl ") endif() if(WITH_SASL_CYRUS) - string(APPEND PKG_CONFIG_REQUIRES "libsasl2 ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libsasl2 ") endif() if(WITH_ZSTD) - string(APPEND PKG_CONFIG_REQUIRES "libzstd ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libzstd ") endif() if(WITH_LZ4_EXT) - string(APPEND PKG_CONFIG_REQUIRES "liblz4 ") + string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "liblz4 ") endif() set(PKG_CONFIG_CFLAGS "-I\${includedir}") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka.c new/librdkafka-2.1.1/src/rdkafka.c --- old/librdkafka-2.1.0/src/rdkafka.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka.c 2023-04-28 09:54:05.000000000 +0200 @@ -4003,20 +4003,37 @@ int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) { int r; + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK); + + if (timeout_ms && can_q_contain_fetched_msgs) + rd_kafka_app_poll_blocking(rk); r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rk); + return r; } rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) { rd_kafka_op_t *rko; + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); + + + if (timeout_ms && can_q_contain_fetched_msgs) + rd_kafka_app_poll_blocking(rkqu->rkqu_rk); rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rkqu->rkqu_rk); + if (!rko) return NULL; @@ -4025,10 +4042,18 @@ int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) { int r; + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); + + if (timeout_ms && can_q_contain_fetched_msgs) + rd_kafka_app_poll_blocking(rkqu->rkqu_rk); r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rkqu->rkqu_rk); + return r; } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka.h new/librdkafka-2.1.1/src/rdkafka.h --- old/librdkafka-2.1.0/src/rdkafka.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka.h 2023-04-28 09:54:05.000000000 +0200 @@ -166,7 +166,7 @@ * @remark This value should only be used during compile time, * for runtime checks of version use rd_kafka_version() */ -#define RD_KAFKA_VERSION 0x020100ff +#define RD_KAFKA_VERSION 0x020101ff /** * @brief Returns the librdkafka version as integer. @@ -3431,6 +3431,12 @@ * * @remark rd_kafka_queue_destroy() MUST be called on this queue * prior to calling rd_kafka_consumer_close(). + * @remark Polling the returned queue counts as a consumer poll, and will reset + * the timer for max.poll.interval.ms. If this queue is forwarded to a + * "destq", polling destq also counts as a consumer poll (this works + * for any number of forwards). However, even if this queue is + * unforwarded or forwarded elsewhere, polling destq will continue + * to count as a consumer poll. */ RD_EXPORT rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk); @@ -6765,8 +6771,7 @@ * request transmission, operation time on broker, and response. * * @param options Admin options. - * @param timeout_ms Timeout in milliseconds, use -1 for indefinite timeout. - * Defaults to `socket.timeout.ms`. + * @param timeout_ms Timeout in milliseconds. Defaults to `socket.timeout.ms`. * @param errstr A human readable error string (nul-terminated) is written to * this location that must be of at least \p errstr_size bytes. * The \p errstr is only written in case of error. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_cgrp.c new/librdkafka-2.1.1/src/rdkafka_cgrp.c --- old/librdkafka-2.1.0/src/rdkafka_cgrp.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_cgrp.c 2023-04-28 09:54:05.000000000 +0200 @@ -415,7 +415,7 @@ rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk); rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve; rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque; - rkcg->rkcg_q = rd_kafka_q_new(rk); + rkcg->rkcg_q = rd_kafka_consume_q_new(rk); rkcg->rkcg_group_instance_id = rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1); diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_fetcher.c new/librdkafka-2.1.1/src/rdkafka_fetcher.c --- old/librdkafka-2.1.0/src/rdkafka_fetcher.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_fetcher.c 2023-04-28 09:54:05.000000000 +0200 @@ -975,7 +975,25 @@ return cnt; } - +/** + * @brief Decide whether it should start fetching from next fetch start + * or continue with current fetch pos. + * + * @param rktp the toppar + * + * @returns rd_true if it should start fetching from next fetch start, + * rd_false otherwise. + * + * @locality any + * @locks toppar_lock() MUST be held + */ +rd_bool_t rd_kafka_toppar_fetch_decide_start_from_next_fetch_start( + rd_kafka_toppar_t *rktp) { + return rktp->rktp_op_version > rktp->rktp_fetch_version || + rd_kafka_fetch_pos_cmp(&rktp->rktp_next_fetch_start, + &rktp->rktp_last_next_fetch_start) || + rktp->rktp_offsets.fetch_pos.offset == RD_KAFKA_OFFSET_INVALID; +} /** * @brief Decide whether this toppar should be on the fetch list or not. @@ -1037,10 +1055,7 @@ /* Update broker thread's fetch op version */ version = rktp->rktp_op_version; - if (version > rktp->rktp_fetch_version || - rd_kafka_fetch_pos_cmp(&rktp->rktp_next_fetch_start, - &rktp->rktp_last_next_fetch_start) || - rktp->rktp_offsets.fetch_pos.offset == RD_KAFKA_OFFSET_INVALID) { + if (rd_kafka_toppar_fetch_decide_start_from_next_fetch_start(rktp)) { /* New version barrier, something was modified from the * control plane. Reset and start over. * Alternatively only the next_offset changed but not the diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_fetcher.h new/librdkafka-2.1.1/src/rdkafka_fetcher.h --- old/librdkafka-2.1.0/src/rdkafka_fetcher.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_fetcher.h 2023-04-28 09:54:05.000000000 +0200 @@ -33,6 +33,9 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now); +rd_bool_t rd_kafka_toppar_fetch_decide_start_from_next_fetch_start( + rd_kafka_toppar_t *rktp); + rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp, rd_kafka_broker_t *rkb, int force_remove); diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_msg.c new/librdkafka-2.1.1/src/rdkafka_msg.c --- old/librdkafka-2.1.0/src/rdkafka_msg.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_msg.c 2023-04-28 09:54:05.000000000 +0200 @@ -1562,8 +1562,8 @@ int32_t rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage) { rd_kafka_msg_t *rkm; - - if (unlikely(!rkmessage->rkt || + if (unlikely(!rkmessage->rkt || rd_kafka_rkt_is_lw(rkmessage->rkt) || + !rkmessage->rkt->rkt_rk || rkmessage->rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)) return -1; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_offset.c new/librdkafka-2.1.1/src/rdkafka_offset.c --- old/librdkafka-2.1.0/src/rdkafka_offset.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_offset.c 2023-04-28 09:54:05.000000000 +0200 @@ -643,7 +643,8 @@ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt); rd_kafka_toppar_t *rktp; rd_kafka_resp_err_t err; - rd_kafka_fetch_pos_t pos = {offset + 1, -1 /*no leader epoch known*/}; + rd_kafka_fetch_pos_t pos = + RD_KAFKA_FETCH_POS(offset + 1, -1 /*no leader epoch known*/); /* Find toppar */ rd_kafka_topic_rdlock(rkt); @@ -675,7 +676,8 @@ for (i = 0; i < offsets->cnt; i++) { rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; rd_kafka_toppar_t *rktp; - rd_kafka_fetch_pos_t pos = {rktpar->offset, -1}; + rd_kafka_fetch_pos_t pos = + RD_KAFKA_FETCH_POS(rktpar->offset, -1); rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar, rd_false); @@ -721,8 +723,8 @@ "Invalid message object, " "not a consumed message"); - pos.offset = rkmessage->offset + 1; - pos.leader_epoch = rkm->rkm_u.consumer.leader_epoch; + pos = RD_KAFKA_FETCH_POS(rkmessage->offset + 1, + rkm->rkm_u.consumer.leader_epoch); err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */, RD_DO_LOCK); @@ -956,9 +958,6 @@ "supported by broker: validation skipped", RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition); - /* Reset the epoch to -1 since it can't be used with - * older brokers. */ - rktp->rktp_next_fetch_start.leader_epoch = -1; rd_kafka_toppar_set_fetch_state( rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE); goto done; @@ -1020,17 +1019,18 @@ if (end_offset < 0 || end_offset_leader_epoch < 0) { rd_kafka_offset_reset( - rktp, rd_kafka_broker_id(rkb), rktp->rktp_next_fetch_start, + rktp, rd_kafka_broker_id(rkb), + rktp->rktp_offset_validation_pos, RD_KAFKA_RESP_ERR__LOG_TRUNCATION, "No epoch found less or equal to " "%s: broker end offset is %" PRId64 " (offset leader epoch %" PRId32 ")." " Reset using configured policy.", - rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), end_offset, end_offset_leader_epoch); - } else if (end_offset < rktp->rktp_next_fetch_start.offset) { + } else if (end_offset < rktp->rktp_offset_validation_pos.offset) { if (rktp->rktp_rkt->rkt_conf.auto_offset_reset == RD_KAFKA_OFFSET_INVALID /* auto.offset.reset=error */) { @@ -1044,7 +1044,8 @@ " (offset leader epoch %" PRId32 "). " "Reset to INVALID.", - rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + rd_kafka_fetch_pos2str( + rktp->rktp_offset_validation_pos), end_offset, end_offset_leader_epoch); } else { @@ -1074,8 +1075,6 @@ rktp->rktp_partition, end_offset, end_offset_leader_epoch); - rktp->rktp_next_fetch_start.leader_epoch = - end_offset_leader_epoch; rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE); } @@ -1166,7 +1165,7 @@ * there is no point in doing validation. * This is the case for epoch-less seek()s or epoch-less * committed offsets. */ - if (rktp->rktp_next_fetch_start.leader_epoch == -1) { + if (rktp->rktp_offset_validation_pos.leader_epoch == -1) { rd_kafka_dbg( rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", "%.*s [%" PRId32 @@ -1174,7 +1173,7 @@ "validation for %s: no leader epoch set", RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition, - rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start)); + rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos)); rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE); return; @@ -1188,18 +1187,18 @@ rktpar = rd_kafka_topic_partition_list_add( parts, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); rd_kafka_topic_partition_set_leader_epoch( - rktpar, rktp->rktp_next_fetch_start.leader_epoch); + rktpar, rktp->rktp_offset_validation_pos.leader_epoch); rd_kafka_topic_partition_set_current_leader_epoch( rktpar, rktp->rktp_leader_epoch); rd_kafka_toppar_keep(rktp); /* for request opaque */ - rd_rkb_dbg(rktp->rktp_leader, FETCH, "VALIDATE", - "%.*s [%" PRId32 - "]: querying broker for epoch " - "validation of %s: %s", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), reason); + rd_rkb_dbg( + rktp->rktp_leader, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: querying broker for epoch " + "validation of %s: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), reason); rd_kafka_OffsetForLeaderEpochRequest( rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_partition.c new/librdkafka-2.1.1/src/rdkafka_partition.c --- old/librdkafka-2.1.0/src/rdkafka_partition.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_partition.c 2023-04-28 09:54:05.000000000 +0200 @@ -243,6 +243,7 @@ rd_kafka_fetch_pos_init(&rktp->rktp_query_pos); rd_kafka_fetch_pos_init(&rktp->rktp_next_fetch_start); rd_kafka_fetch_pos_init(&rktp->rktp_last_next_fetch_start); + rd_kafka_fetch_pos_init(&rktp->rktp_offset_validation_pos); rd_kafka_fetch_pos_init(&rktp->rktp_app_pos); rd_kafka_fetch_pos_init(&rktp->rktp_stored_pos); rd_kafka_fetch_pos_init(&rktp->rktp_committing_pos); @@ -252,7 +253,7 @@ mtx_init(&rktp->rktp_lock, mtx_plain); rd_refcnt_init(&rktp->rktp_refcnt, 0); - rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk); + rktp->rktp_fetchq = rd_kafka_consume_q_new(rkt->rkt_rk); rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk); rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve; rktp->rktp_ops->rkq_opaque = rktp; @@ -359,9 +360,6 @@ * @locks_required rd_kafka_toppar_lock() MUST be held. */ void rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t *rktp, int fetch_state) { - rd_kafka_assert(NULL, - thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)); - if ((int)rktp->rktp_fetch_state == fetch_state) return; @@ -1798,6 +1796,7 @@ rd_kafka_toppar_set_fetch_state( rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); rd_kafka_toppar_set_next_fetch_position(rktp, pos); + rd_kafka_toppar_set_offset_validation_position(rktp, pos); rd_kafka_offset_validate(rktp, "seek"); } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_partition.h new/librdkafka-2.1.1/src/rdkafka_partition.h --- old/librdkafka-2.1.0/src/rdkafka_partition.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_partition.h 2023-04-28 09:54:05.000000000 +0200 @@ -325,6 +325,10 @@ * @locality toppar thread */ rd_kafka_fetch_pos_t rktp_last_next_fetch_start; + /** The offset to verify. + * @locality toppar thread */ + rd_kafka_fetch_pos_t rktp_offset_validation_pos; + /** Application's position. * This is the latest offset delivered to application + 1. * It is reset to INVALID_OFFSET when partition is @@ -1047,7 +1051,7 @@ * @brief Set's the partitions next fetch position, i.e., the next offset * to start fetching from. * - * @locks_required rd_kafka_toppar_lock(rktp) MUST be held. + * @locks rd_kafka_toppar_lock(rktp) MUST be held. */ static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_next_fetch_position(rd_kafka_toppar_t *rktp, @@ -1055,4 +1059,15 @@ rktp->rktp_next_fetch_start = next_pos; } +/** + * @brief Sets the offset validation position. + * + * @locks rd_kafka_toppar_lock(rktp) MUST be held. + */ +static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position( + rd_kafka_toppar_t *rktp, + rd_kafka_fetch_pos_t offset_validation_pos) { + rktp->rktp_offset_validation_pos = offset_validation_pos; +} + #endif /* _RDKAFKA_PARTITION_H_ */ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_queue.c new/librdkafka-2.1.1/src/rdkafka_queue.c --- old/librdkafka-2.1.0/src/rdkafka_queue.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_queue.c 2023-04-28 09:54:05.000000000 +0200 @@ -83,12 +83,15 @@ */ void rd_kafka_q_init0(rd_kafka_q_t *rkq, rd_kafka_t *rk, + rd_bool_t for_consume, const char *func, int line) { rd_kafka_q_reset(rkq); rkq->rkq_fwdq = NULL; rkq->rkq_refcnt = 1; rkq->rkq_flags = RD_KAFKA_Q_F_READY; + if (for_consume) + rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER; rkq->rkq_rk = rk; rkq->rkq_qio = NULL; rkq->rkq_serve = NULL; @@ -106,9 +109,15 @@ /** * Allocate a new queue and initialize it. */ -rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) { +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, + rd_bool_t for_consume, + const char *func, + int line) { rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq)); - rd_kafka_q_init(rkq, rk); + if (!for_consume) + rd_kafka_q_init(rkq, rk); + else + rd_kafka_consume_q_init(rkq, rk); rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED; #if ENABLE_DEVEL rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); @@ -118,6 +127,33 @@ return rkq; } +/* + * Sets the flag RD_KAFKA_Q_F_CONSUMER for rkq, any queues it's being forwarded + * to, recursively. + * Setting this flag indicates that polling this queue is equivalent to calling + * consumer poll, and will reset the max.poll.interval.ms timer. Only used + * internally when forwarding queues. + * @locks rd_kafka_q_lock(rkq) + */ +static void rd_kafka_q_consumer_propagate(rd_kafka_q_t *rkq) { + mtx_lock(&rkq->rkq_lock); + rkq->rkq_flags |= RD_KAFKA_Q_F_CONSUMER; + + if (!rkq->rkq_fwdq) { + mtx_unlock(&rkq->rkq_lock); + return; + } + + /* Recursively propagate the flag to any queues rkq is already + * forwarding to. There will be a deadlock here if the queues are being + * forwarded circularly, but that is a user error. We can't resolve this + * deadlock by unlocking before the recursive call, because that leads + * to incorrectness if the rkq_fwdq is forwarded elsewhere and the old + * one destroyed between recursive calls. */ + rd_kafka_q_consumer_propagate(rkq->rkq_fwdq); + mtx_unlock(&rkq->rkq_lock); +} + /** * Set/clear forward queue. * Queue forwarding enables message routing inside rdkafka. @@ -152,6 +188,9 @@ } srcq->rkq_fwdq = destq; + + if (srcq->rkq_flags & RD_KAFKA_Q_F_CONSUMER) + rd_kafka_q_consumer_propagate(destq); } if (do_lock) mtx_unlock(&srcq->rkq_lock); @@ -610,6 +649,7 @@ rd_kafka_q_destroy(fwdq); return cnt; } + mtx_unlock(&rkq->rkq_lock); if (timeout_ms) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_queue.h new/librdkafka-2.1.1/src/rdkafka_queue.h --- old/librdkafka-2.1.0/src/rdkafka_queue.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_queue.h 2023-04-28 09:54:05.000000000 +0200 @@ -75,6 +75,11 @@ * by triggering the cond-var \ * but without having to enqueue \ * an op. */ +#define RD_KAFKA_Q_F_CONSUMER \ + 0x10 /* If this flag is set, this queue might contain fetched messages \ + from partitions. Polling this queue will reset the \ + max.poll.interval.ms timer. Once set, this flag is never \ + reset. */ rd_kafka_t *rkq_rk; struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */ @@ -123,12 +128,20 @@ void rd_kafka_q_init0(rd_kafka_q_t *rkq, rd_kafka_t *rk, + rd_bool_t for_consume, const char *func, int line); #define rd_kafka_q_init(rkq, rk) \ - rd_kafka_q_init0(rkq, rk, __FUNCTION__, __LINE__) -rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line); -#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, __FUNCTION__, __LINE__) + rd_kafka_q_init0(rkq, rk, rd_false, __FUNCTION__, __LINE__) +#define rd_kafka_consume_q_init(rkq, rk) \ + rd_kafka_q_init0(rkq, rk, rd_true, __FUNCTION__, __LINE__) +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, + rd_bool_t for_consume, + const char *func, + int line); +#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, rd_false, __FUNCTION__, __LINE__) +#define rd_kafka_consume_q_new(rk) \ + rd_kafka_q_new0(rk, rd_true, __FUNCTION__, __LINE__) void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq); #define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock) @@ -1164,6 +1177,22 @@ return rko; } +/** + * @brief Returns true if the queue can contain fetched messages. + * + * @locks rd_kafka_q_lock(rkq) if do_lock is set. + */ +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_q_can_contain_fetched_msgs(rd_kafka_q_t *rkq, rd_bool_t do_lock) { + rd_bool_t val; + if (do_lock) + mtx_lock(&rkq->rkq_lock); + val = rkq->rkq_flags & RD_KAFKA_Q_F_CONSUMER; + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + return val; +} + /**@}*/ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src/rdkafka_topic.c new/librdkafka-2.1.1/src/rdkafka_topic.c --- old/librdkafka-2.1.0/src/rdkafka_topic.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src/rdkafka_topic.c 2023-04-28 09:54:05.000000000 +0200 @@ -39,6 +39,7 @@ #include "rdsysqueue.h" #include "rdtime.h" #include "rdregex.h" +#include "rdkafka_fetcher.h" #if WITH_ZSTD #include <zstd.h> @@ -725,11 +726,16 @@ } if (need_epoch_validation) { - /* Update next fetch position, that could be stale since last - * fetch start. Only if the app pos is real. */ - if (rktp->rktp_app_pos.offset > 0) { - rd_kafka_toppar_set_next_fetch_position( - rktp, rktp->rktp_app_pos); + /* Set offset validation position, + * depending it if should continue with current position or + * with next fetch start position. */ + if (rd_kafka_toppar_fetch_decide_start_from_next_fetch_start( + rktp)) { + rd_kafka_toppar_set_offset_validation_position( + rktp, rktp->rktp_next_fetch_start); + } else { + rd_kafka_toppar_set_offset_validation_position( + rktp, rktp->rktp_offsets.fetch_pos); } rd_kafka_offset_validate(rktp, "epoch updated from metadata"); } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/CMakeLists.txt new/librdkafka-2.1.1/src-cpp/CMakeLists.txt --- old/librdkafka-2.1.0/src-cpp/CMakeLists.txt 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src-cpp/CMakeLists.txt 2023-04-28 09:54:05.000000000 +0200 @@ -41,7 +41,7 @@ if(NOT RDKAFKA_BUILD_STATIC) set(PKG_CONFIG_NAME "librdkafka++") set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library") - set(PKG_CONFIG_REQUIRES "rdkafka") + set(PKG_CONFIG_REQUIRES_PRIVATE "rdkafka") set(PKG_CONFIG_CFLAGS "-I\${includedir}") set(PKG_CONFIG_LIBS "-L\${libdir} -lrdkafka++") set(PKG_CONFIG_LIBS_PRIVATE "-lrdkafka") @@ -57,7 +57,7 @@ else() set(PKG_CONFIG_NAME "librdkafka++-static") set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library (static)") - set(PKG_CONFIG_REQUIRES "") + set(PKG_CONFIG_REQUIRES_PRIVATE "") set(PKG_CONFIG_CFLAGS "-I\${includedir} -DLIBRDKAFKA_STATICLIB") set(PKG_CONFIG_LIBS "-L\${libdir} \${libdir}/librdkafka++.a") if(WIN32) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/rdkafkacpp.h new/librdkafka-2.1.1/src-cpp/rdkafkacpp.h --- old/librdkafka-2.1.0/src-cpp/rdkafkacpp.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src-cpp/rdkafkacpp.h 2023-04-28 09:54:05.000000000 +0200 @@ -111,7 +111,7 @@ * @remark This value should only be used during compile time, * for runtime checks of version use RdKafka::version() */ -#define RD_KAFKA_VERSION 0x020100ff +#define RD_KAFKA_VERSION 0x020101ff /** * @brief Returns the librdkafka version as integer. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h new/librdkafka-2.1.1/src-cpp/rdkafkacpp_int.h --- old/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/src-cpp/rdkafkacpp_int.h 2023-04-28 09:54:05.000000000 +0200 @@ -1289,7 +1289,7 @@ } void set_leader_epoch(int32_t leader_epoch) { - leader_epoch_ = leader_epoch_; + leader_epoch_ = leader_epoch; } std::ostream &operator<<(std::ostream &ostrm) const { diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/0033-regex_subscribe.c new/librdkafka-2.1.1/tests/0033-regex_subscribe.c --- old/librdkafka-2.1.0/tests/0033-regex_subscribe.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/tests/0033-regex_subscribe.c 2023-04-28 09:54:05.000000000 +0200 @@ -174,6 +174,13 @@ rkmessage->partition, rkmessage->offset); } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) { + /* Test segfault associated with this call is solved */ + int32_t leader_epoch = rd_kafka_message_leader_epoch(rkmessage); + TEST_ASSERT(leader_epoch == -1, + "rd_kafka_message_leader_epoch should be -1" + ", got %" PRId32, + leader_epoch); + if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST")) TEST_SAY("%s: %s: error is expected for this topic\n", rd_kafka_topic_name(rkmessage->rkt), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/0089-max_poll_interval.c new/librdkafka-2.1.1/tests/0089-max_poll_interval.c --- old/librdkafka-2.1.0/tests/0089-max_poll_interval.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/tests/0089-max_poll_interval.c 2023-04-28 09:54:05.000000000 +0200 @@ -351,8 +351,95 @@ SUB_TEST_PASS(); } + +/** + * @brief Consumer should be able to rejoin the group just by polling after + * leaving due to a max.poll.interval.ms timeout. The poll does not need to + * go through any special function, any queue containing consumer messages + * should suffice. + * We test with the result of rd_kafka_queue_get_consumer, and an arbitrary + * queue that is forwarded to by the result of rd_kafka_queue_get_consumer. + */ +static void +do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) { + const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); + rd_kafka_conf_t *conf; + char groupid[64]; + rd_kafka_t *rk = NULL; + rd_kafka_queue_t *consumer_queue = NULL; + rd_kafka_event_t *event = NULL; + rd_kafka_queue_t *polling_queue = NULL; + + SUB_TEST("Testing with forward_to_another_q = %d", + forward_to_another_q); + + test_create_topic(NULL, topic, 1, 1); + + test_str_id_generate(groupid, sizeof(groupid)); + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/); + test_conf_set(conf, "partition.assignment.strategy", "range"); + + /* We need to specify a non-NULL rebalance CB to get events of type + * RD_KAFKA_EVENT_REBALANCE. */ + rk = test_create_consumer(groupid, test_rebalance_cb, conf, NULL); + + consumer_queue = rd_kafka_queue_get_consumer(rk); + + test_consumer_subscribe(rk, topic); + + if (forward_to_another_q) { + polling_queue = rd_kafka_queue_new(rk); + rd_kafka_queue_forward(consumer_queue, polling_queue); + } else + polling_queue = consumer_queue; + + event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE, + (int)(test_timeout_multiplier * 10000)); + TEST_ASSERT(event, + "Did not get a rebalance event for initial group join"); + TEST_ASSERT(rd_kafka_event_error(event) == + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + "Group join should assign partitions"); + rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event)); + rd_kafka_event_destroy(event); + + rd_sleep(10 + 1); /* Exceed max.poll.interval.ms. */ + + /* Note that by polling for the group leave, we're also polling the + * consumer queue, and hence it should trigger a rejoin. */ + event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE, + (int)(test_timeout_multiplier * 10000)); + TEST_ASSERT(event, "Did not get a rebalance event for the group leave"); + TEST_ASSERT(rd_kafka_event_error(event) == + RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + "Group leave should revoke partitions"); + rd_kafka_assign(rk, NULL); + rd_kafka_event_destroy(event); + + event = test_wait_event(polling_queue, RD_KAFKA_EVENT_REBALANCE, + (int)(test_timeout_multiplier * 10000)); + TEST_ASSERT(event, "Should get a rebalance event for the group rejoin"); + TEST_ASSERT(rd_kafka_event_error(event) == + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + "Group rejoin should assign partitions"); + rd_kafka_assign(rk, rd_kafka_event_topic_partition_list(event)); + rd_kafka_event_destroy(event); + + if (forward_to_another_q) + rd_kafka_queue_destroy(polling_queue); + rd_kafka_queue_destroy(consumer_queue); + test_consumer_close(rk); + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + int main_0089_max_poll_interval(int argc, char **argv) { do_test(); do_test_with_log_queue(); + do_test_rejoin_after_interval_expire(rd_false); + do_test_rejoin_after_interval_expire(rd_true); return 0; } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/0139-offset_validation_mock.c new/librdkafka-2.1.1/tests/0139-offset_validation_mock.c --- old/librdkafka-2.1.0/tests/0139-offset_validation_mock.c 1970-01-01 01:00:00.000000000 +0100 +++ new/librdkafka-2.1.1/tests/0139-offset_validation_mock.c 2023-04-28 09:54:05.000000000 +0200 @@ -0,0 +1,151 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +#include "../src/rdkafka_proto.h" + + +struct _produce_args { + const char *topic; + int sleep; + rd_kafka_conf_t *conf; +}; + +static int produce_concurrent_thread(void *args) { + rd_kafka_t *p1; + test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR; + test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED; + + struct _produce_args *produce_args = args; + rd_sleep(produce_args->sleep); + + p1 = test_create_handle(RD_KAFKA_PRODUCER, produce_args->conf); + TEST_CALL_ERR__( + rd_kafka_producev(p1, RD_KAFKA_V_TOPIC(produce_args->topic), + RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); + rd_kafka_flush(p1, -1); + rd_kafka_destroy(p1); + return 0; +} + +/** + * @brief Send a produce request in the middle of an offset validation + * and expect that the fetched message is discarded, don't producing + * a duplicate when state becomes active again. See #4249. + */ +static void do_test_no_duplicates_during_offset_validation(void) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + rd_kafka_conf_t *conf, *conf_producer; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int initial_msg_count = 5; + thrd_t thrd; + struct _produce_args args = RD_ZERO_INIT; + uint64_t testid = test_id_generate(); + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + + /* Slow down OffsetForLeaderEpoch so a produce and + * subsequent fetch can happen while it's in-flight */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_NO_ERROR, 5000); + + test_conf_init(&conf_producer, NULL, 60); + test_conf_set(conf_producer, "bootstrap.servers", bootstraps); + + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, initial_msg_count, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + args.topic = topic; + /* Makes that the message is produced while an offset validation + * is ongoing */ + args.sleep = 5; + args.conf = conf_producer; + /* Spin up concurrent thread */ + if (thrd_create(&thrd, produce_concurrent_thread, (void *)&args) != + thrd_success) + TEST_FAIL("Failed to create thread"); + + test_conf_init(&conf, NULL, 60); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + /* Makes that an offset validation happens at the same + * time a new message is being produced */ + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "enable.auto.offset.store", "false"); + test_conf_set(conf, "enable.partition.eof", "true"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + /* Consume initial messages */ + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, initial_msg_count, + NULL); + /* EOF after initial messages */ + test_consumer_poll("MSG_EOF", c1, testid, 1, initial_msg_count, 0, + NULL); + /* Concurrent producer message and EOF */ + test_consumer_poll("MSG_AND_EOF", c1, testid, 1, initial_msg_count, 1, + NULL); + /* Only an EOF, not a duplicate message */ + test_consumer_poll("MSG_EOF2", c1, testid, 1, initial_msg_count, 0, + NULL); + + thrd_join(thrd, NULL); + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + +int main_0139_offset_validation_mock(int argc, char **argv) { + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_no_duplicates_during_offset_validation(); + + return 0; +} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/8001-fetch_from_follower_mock_manual.c new/librdkafka-2.1.1/tests/8001-fetch_from_follower_mock_manual.c --- old/librdkafka-2.1.0/tests/8001-fetch_from_follower_mock_manual.c 1970-01-01 01:00:00.000000000 +0100 +++ new/librdkafka-2.1.1/tests/8001-fetch_from_follower_mock_manual.c 2023-04-28 09:54:05.000000000 +0200 @@ -0,0 +1,116 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +#include "../src/rdkafka_proto.h" + +/** + * @brief Test that the #4195 segfault doesn't happen when preferred replica + * lease expires and the rktp is in fetch state + * RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT. + */ +static void do_test_fetch_from_follower_offset_retry(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *seek; + int i; + + SUB_TEST_QUICK(); + test_timeout_set(600); + + mcluster = test_mock_cluster_new(3, &bootstraps); + /* Set partition leader to broker 1. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "fetch.error.backoff.ms", "1000"); + test_conf_set(conf, "fetch.message.max.bytes", "10"); + test_conf_set(conf, "session.timeout.ms", "600000"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition( + "do_test_fetch_from_follower_offset_retry", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + /* Since there are no messages, this poll only waits for metadata, and + * then sets the preferred replica after the first fetch request. + * Subsequent polls are for waiting up to 5 minutes. */ + for (i = 0; i < 7; i++) { + test_consumer_poll_no_msgs( + "initial metadata and preferred replica set", c, 0, 40000); + } + + + /* Seek to end to trigger ListOffsets */ + seek = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add(seek, topic, 0); + rktpar->offset = RD_KAFKA_OFFSET_END; + + /* Increase RTT for this ListOffsets */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_ListOffsets, 1, RD_KAFKA_RESP_ERR_NO_ERROR, + 40 * 1000); + + rd_kafka_seek_partitions(c, seek, -1); + rd_kafka_topic_partition_list_destroy(seek); + + /* Wait lease expiry */ + rd_sleep(50); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + +int main_8001_fetch_from_follower_mock_manual(int argc, char **argv) { + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_fetch_from_follower_offset_retry(); + + return 0; +} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/CMakeLists.txt new/librdkafka-2.1.1/tests/CMakeLists.txt --- old/librdkafka-2.1.0/tests/CMakeLists.txt 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/tests/CMakeLists.txt 2023-04-28 09:54:05.000000000 +0200 @@ -129,7 +129,9 @@ 0136-resolve_cb.c 0137-barrier_batch_consume.c 0138-admin_mock.c + 0139-offset_validation_mock.c 8000-idle.cpp + 8001-fetch_from_follower_mock_manual.c test.c testcpp.cpp rusage.c diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/Makefile new/librdkafka-2.1.1/tests/Makefile --- old/librdkafka-2.1.0/tests/Makefile 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/tests/Makefile 2023-04-28 09:54:05.000000000 +0200 @@ -12,7 +12,7 @@ LDFLAGS += -rdynamic -L../src -L../src-cpp # Latest Kafka version -KAFKA_VERSION?=3.1.0 +KAFKA_VERSION?=3.4.0 # Kafka versions for compatibility tests COMPAT_KAFKA_VERSIONS?=0.8.2.2 0.9.0.1 0.11.0.3 1.0.2 2.4.1 2.8.1 $(KAFKA_VERSION) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/tests/test.c new/librdkafka-2.1.1/tests/test.c --- old/librdkafka-2.1.0/tests/test.c 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/tests/test.c 2023-04-28 09:54:05.000000000 +0200 @@ -246,9 +246,12 @@ _TEST_DECL(0136_resolve_cb); _TEST_DECL(0137_barrier_batch_consume); _TEST_DECL(0138_admin_mock); +_TEST_DECL(0139_offset_validation_mock); + /* Manual tests */ _TEST_DECL(8000_idle); +_TEST_DECL(8001_fetch_from_follower_mock_manual); /* Define test resource usage thresholds if the default limits @@ -490,9 +493,11 @@ _TEST(0136_resolve_cb, TEST_F_LOCAL), _TEST(0137_barrier_batch_consume, 0), _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)), + _TEST(0139_offset_validation_mock, 0), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), + _TEST(8001_fetch_from_follower_mock_manual, TEST_F_MANUAL), {NULL}}; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/vcpkg.json new/librdkafka-2.1.1/vcpkg.json --- old/librdkafka-2.1.0/vcpkg.json 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/vcpkg.json 2023-04-28 09:54:05.000000000 +0200 @@ -1,6 +1,6 @@ { "name": "librdkafka", - "version": "2.1.0", + "version": "2.1.1", "dependencies": [ { "name": "zstd", diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-2.1.0/win32/tests/tests.vcxproj new/librdkafka-2.1.1/win32/tests/tests.vcxproj --- old/librdkafka-2.1.0/win32/tests/tests.vcxproj 2023-04-03 21:45:45.000000000 +0200 +++ new/librdkafka-2.1.1/win32/tests/tests.vcxproj 2023-04-28 09:54:05.000000000 +0200 @@ -219,7 +219,9 @@ <ClCompile Include="..\..\tests\0136-resolve_cb.c" /> <ClCompile Include="..\..\tests\0137-barrier_batch_consume.c" /> <ClCompile Include="..\..\tests\0138-admin_mock.c" /> + <ClCompile Include="..\..\tests\0139-offset_validation_mock.c" /> <ClCompile Include="..\..\tests\8000-idle.cpp" /> + <ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" /> <ClCompile Include="..\..\tests\test.c" /> <ClCompile Include="..\..\tests\testcpp.cpp" /> <ClCompile Include="..\..\tests\rusage.c" />
