This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit d63bf0d5aa29a3ceb3ce671e5e12bd7e4204f3a4 Author: morningman <[email protected]> AuthorDate: Sat Mar 11 22:24:29 2023 +0800 [fix](routine load) fix ROUTINE LOAD bug,kafka commit a lack of one(#17282) #17291 cherry-pick #17282 --- be/src/runtime/routine_load/routine_load_task_executor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index a5535199e2..b4fd4b0f4f 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -331,8 +331,9 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool std::vector<RdKafka::TopicPartition*> topic_partitions; for (auto& kv : ctx->kafka_info->cmt_offset) { - RdKafka::TopicPartition* tp1 = - RdKafka::TopicPartition::create(ctx->kafka_info->topic, kv.first, kv.second); + // The offsets you commit are the offsets of the messages you want to read next + RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( + ctx->kafka_info->topic, kv.first, kv.secondi + 1); topic_partitions.push_back(tp1); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
