This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d63bf0d5aa29a3ceb3ce671e5e12bd7e4204f3a4
Author: morningman <[email protected]>
AuthorDate: Sat Mar 11 22:24:29 2023 +0800

    [fix](routine load) fix ROUTINE LOAD bug,kafka commit a lack of one(#17282) 
#17291
    
    cherry-pick #17282
---
 be/src/runtime/routine_load/routine_load_task_executor.cpp | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index a5535199e2..b4fd4b0f4f 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -331,8 +331,9 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* 
ctx, DataConsumerPool
 
         std::vector<RdKafka::TopicPartition*> topic_partitions;
         for (auto& kv : ctx->kafka_info->cmt_offset) {
-            RdKafka::TopicPartition* tp1 =
-                    RdKafka::TopicPartition::create(ctx->kafka_info->topic, 
kv.first, kv.second);
+            // The offsets you commit are the offsets of the messages you want 
to read next
+            RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create(
+                    ctx->kafka_info->topic, kv.first, kv.secondi + 1);
             topic_partitions.push_back(tp1);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to