This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e19f6033c02 [fix](routine-load) fix routine load pause when Kafka data
deleted after TTL(#37288) (#39183)
e19f6033c02 is described below
commit e19f6033c0258219e14d380af92235d5dd269fbf
Author: hui lai <[email protected]>
AuthorDate: Sun Aug 11 17:47:16 2024 +0800
[fix](routine-load) fix routine load pause when Kafka data deleted after
TTL(#37288) (#39183)
pick (#37288)
When using routine load, After the data load is completed, the lag is
still a positive number:
```
Lag:
{"0":16,"1":15,"2":16,"3":16,"4":16,"5":16,"6":15,"7":16,"8":16,"9":16,"10":15,"11":16,"12":15,"13":15,"14":16,"15":16,"16":17,"17":15,"18":16,"19":15,"20":16,"21":16,"22":16,"23":16,"24":15,"25":17,"26":17,"27":16,"28":16,"29":16,"30":16,"31":17,"32":14,"33":16,"34":17,"35":16,"36":15,"37":15,"38":15,"39":16,"40":16,"41":16,"42":15,"43":15,"44":17,"45":16,"46":15,"47":15,"48":16,"49":17,"50":16,"51":15,"52":16,"53":15,"54":15,"55":17,"56":16,"57":17,"58":16,"59":16,"60":15,"61
[...]
```
and the routing load is paused when the Kafka data reaches TTL and is
deleted, the error is `out of range`.
The reason why this happened is EOF has it offset which needed
statistics.
**note(important):**
After the bug is fixed, if you set
```
"property.enable.partition.eof" = "false"
```
in your routine load job, it will meet the problem. For EOF has offset,
and the config is true in Doris default.
---
be/src/runtime/routine_load/data_consumer.cpp | 23 ++++++++------
.../runtime/routine_load/data_consumer_group.cpp | 36 +++++++++++++---------
2 files changed, 35 insertions(+), 24 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 2874a43ae0e..11953772150 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -260,6 +260,20 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
+ case RdKafka::ERR__PARTITION_EOF: {
+ LOG(INFO) << "consumer meet partition eof: " << _id
+ << " partition offset: " << msg->offset();
+ _consuming_partition_ids.erase(msg->partition());
+ if (!queue->blocking_put(msg.get())) {
+ done = true;
+ } else if (_consuming_partition_ids.size() <= 0) {
+ msg.release();
+ done = true;
+ } else {
+ msg.release();
+ }
+ break;
+ }
case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
done = true;
std::stringstream ss;
@@ -269,15 +283,6 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
st = Status::InternalError<false>(ss.str());
break;
}
- case RdKafka::ERR__PARTITION_EOF: {
- LOG(INFO) << "consumer meet partition eof: " << _id
- << " partition offset: " << msg->offset();
- _consuming_partition_ids.erase(msg->partition());
- if (_consuming_partition_ids.size() <= 0) {
- done = true;
- }
- break;
- }
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
msg->errstr();
done = true;
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 27eea942664..f429dac560c 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -162,22 +162,28 @@ Status
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
<< ", partition: " << msg->partition() << ", offset: "
<< msg->offset()
<< ", len: " << msg->len();
- Status st = (kafka_pipe.get()->*append_data)(static_cast<const
char*>(msg->payload()),
-
static_cast<size_t>(msg->len()));
- if (st.ok()) {
- left_rows--;
- left_bytes -= msg->len();
- cmt_offset[msg->partition()] = msg->offset();
- VLOG_NOTICE << "consume partition[" << msg->partition() << " -
" << msg->offset()
- << "]";
+ if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
+ if (msg->offset() > 0) {
+ cmt_offset[msg->partition()] = msg->offset() - 1;
+ }
} else {
- // failed to append this msg, we must stop
- LOG(WARNING) << "failed to append msg to pipe. grp: " <<
_grp_id;
- eos = true;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- if (result_st.ok()) {
- result_st = st;
+ Status st = (kafka_pipe.get()->*append_data)(
+ static_cast<const char*>(msg->payload()),
static_cast<size_t>(msg->len()));
+ if (st.ok()) {
+ left_rows--;
+ left_bytes -= msg->len();
+ cmt_offset[msg->partition()] = msg->offset();
+ VLOG_NOTICE << "consume partition[" << msg->partition() <<
" - "
+ << msg->offset() << "]";
+ } else {
+ // failed to append this msg, we must stop
+ LOG(WARNING) << "failed to append msg to pipe. grp: " <<
_grp_id;
+ eos = true;
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+ if (result_st.ok()) {
+ result_st = st;
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]