This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3277e8fbc [improvement](routine-load) add routine load rows check
(#25818)
5e3277e8fbc is described below
commit 5e3277e8fbc92f08344bc86e9b2c74033a8bbc01
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Oct 25 11:04:28 2023 +0800
[improvement](routine-load) add routine load rows check (#25818)
---
be/src/runtime/routine_load/data_consumer_group.cpp | 3 ++-
be/src/runtime/routine_load/data_consumer_group.h | 6 ++++++
be/src/runtime/routine_load/routine_load_task_executor.cpp | 9 +++++++++
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 60e7c57a6c1..c54bbbd99dd 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -125,9 +125,10 @@ Status
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
bool eos = false;
while (true) {
if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
+ _rows = ctx->max_batch_rows - left_rows;
LOG(INFO) << "consumer group done: " << _grp_id
<< ". consume time(ms)=" << ctx->max_interval_s * 1000 -
left_time
- << ", received rows=" << ctx->max_batch_rows - left_rows
+ << ", received rows=" << _rows
<< ", received bytes=" << ctx->max_batch_size -
left_bytes << ", eos: " << eos
<< ", left_time: " << left_time << ", left_rows: " <<
left_rows
<< ", left_bytes: " << left_bytes
diff --git a/be/src/runtime/routine_load/data_consumer_group.h
b/be/src/runtime/routine_load/data_consumer_group.h
index e15ad7115f6..0cda80a9ec4 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -60,6 +60,10 @@ public:
++_counter;
}
+ int64_t get_consumer_rows() const { return _rows; }
+
+ void set_consumer_rows(int64_t rows) { _rows = rows; }
+
// start all consumers
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe>
kafka_pipe) {
@@ -77,6 +81,8 @@ protected:
// when the counter becomes zero, shutdown the queue to finish
std::mutex _mutex;
int _counter;
+ // received total rows
+ int64_t _rows {0};
};
// for kafka
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 445e78e06f3..c9769b43b44 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -355,6 +355,15 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
// wait for all consumers finished
HANDLE_ERROR(ctx->future.get(), "consume failed");
+ // check received and load rows
+ LOG(INFO) << "routine load task received rows: " <<
consumer_grp.get()->get_consumer_rows()
+ << " load total rows: " << ctx.get()->number_total_rows
+ << " loaded rows: " << ctx.get()->number_loaded_rows
+ << " filtered rows: " << ctx.get()->number_filtered_rows
+ << " unselected rows: " << ctx.get()->number_unselected_rows;
+ DCHECK(consumer_grp.get()->get_consumer_rows() ==
ctx.get()->number_total_rows);
+ consumer_grp.get()->set_consumer_rows(0);
+
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
// return the consumer back to pool
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]