This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c97f799ceda [opt](routine-load) end Kafka consume when meets partition
EOF #32046 (#37442)
c97f799ceda is described below
commit c97f799ceda3b7cbc336e4ea2f75266e1d807275
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:18:45 2024 +0800
[opt](routine-load) end Kafka consume when meets partition EOF #32046
(#37442)
pick #32046
Co-authored-by: HHoflittlefish777
<[email protected]>
---
be/src/runtime/routine_load/data_consumer.cpp | 15 ++++++++++++++-
be/src/runtime/routine_load/data_consumer.h | 1 +
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 4d751751003..ccf5fb4cb25 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -80,7 +80,7 @@ Status
KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
};
RETURN_IF_ERROR(set_conf("metadata.broker.list",
ctx->kafka_info->brokers));
- RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
+ RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
@@ -162,6 +162,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
RdKafka::TopicPartition* tp1 =
RdKafka::TopicPartition::create(topic, entry.first,
entry.second);
topic_partitions.push_back(tp1);
+ _consuming_partition_ids.insert(entry.first);
ss << "[" << entry.first << ": " << entry.second << "] ";
}
@@ -219,6 +220,9 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.stop();
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
+ if (_consuming_partition_ids.count(msg->partition()) <= 0) {
+ _consuming_partition_ids.insert(msg->partition());
+ }
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process
shutting down.
@@ -245,6 +249,15 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
+ case RdKafka::ERR__PARTITION_EOF: {
+ LOG(INFO) << "consumer meet partition eof: " << _id
+ << " partition offset: " << msg->offset();
+ _consuming_partition_ids.erase(msg->partition());
+ if (_consuming_partition_ids.size() <= 0) {
+ done = true;
+ }
+ break;
+ }
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
msg->errstr();
done = true;
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index 596817b6b5d..f6c10467786 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -160,6 +160,7 @@ private:
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;
+ std::set<int32_t> _consuming_partition_ids;
KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]