This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c97f799ceda [opt](routine-load) end Kafka consume when meets partition 
EOF #32046 (#37442)
c97f799ceda is described below

commit c97f799ceda3b7cbc336e4ea2f75266e1d807275
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:18:45 2024 +0800

    [opt](routine-load) end Kafka consume when meets partition EOF #32046 
(#37442)
    
    pick #32046
    
    Co-authored-by: HHoflittlefish777 
<[email protected]>
---
 be/src/runtime/routine_load/data_consumer.cpp | 15 ++++++++++++++-
 be/src/runtime/routine_load/data_consumer.h   |  1 +
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 4d751751003..ccf5fb4cb25 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -80,7 +80,7 @@ Status 
KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
     };
 
     RETURN_IF_ERROR(set_conf("metadata.broker.list", 
ctx->kafka_info->brokers));
-    RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
+    RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
     RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
     // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
     RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
@@ -162,6 +162,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
         RdKafka::TopicPartition* tp1 =
                 RdKafka::TopicPartition::create(topic, entry.first, 
entry.second);
         topic_partitions.push_back(tp1);
+        _consuming_partition_ids.insert(entry.first);
         ss << "[" << entry.first << ": " << entry.second << "] ";
     }
 
@@ -219,6 +220,9 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
         consumer_watch.stop();
         switch (msg->err()) {
         case RdKafka::ERR_NO_ERROR:
+            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
+                _consuming_partition_ids.insert(msg->partition());
+            }
             if (msg->len() == 0) {
                 // ignore msg with length 0.
                 // put empty msg into queue will cause the load process 
shutting down.
@@ -245,6 +249,15 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
                 break;
             }
             [[fallthrough]];
+        case RdKafka::ERR__PARTITION_EOF: {
+            LOG(INFO) << "consumer meet partition eof: " << _id
+                      << " partition offset: " << msg->offset();
+            _consuming_partition_ids.erase(msg->partition());
+            if (_consuming_partition_ids.size() <= 0) {
+                done = true;
+            }
+            break;
+        }
         default:
             LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
msg->errstr();
             done = true;
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index 596817b6b5d..f6c10467786 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -160,6 +160,7 @@ private:
     std::string _brokers;
     std::string _topic;
     std::unordered_map<std::string, std::string> _custom_properties;
+    std::set<int32_t> _consuming_partition_ids;
 
     KafkaEventCb _k_event_cb;
     RdKafka::KafkaConsumer* _k_consumer = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to