This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 538df28 [improvement](routine-load) Support routine load task succeed
with empty data consumed (#8256)
538df28 is described below
commit 538df287372f42b69a01bf44e765477e55c5409b
Author: caiconghui <[email protected]>
AuthorDate: Thu Mar 3 22:35:50 2022 +0800
[improvement](routine-load) Support routine load task succeed with empty
data consumed (#8256)
---
be/src/runtime/routine_load/data_consumer_group.cpp | 17 ++++-------------
1 file changed, 4 insertions(+), 13 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 9096e15..5f6c789 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -146,19 +146,10 @@ Status
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
return result_st;
}
- if (left_bytes == ctx->max_batch_size) {
- // nothing to be consumed, we have to cancel it, because
- // we do not allow finishing stream load pipe without data
- kafka_pipe->cancel("no data");
- return Status::Cancelled("Cancelled");
- } else {
- DCHECK(left_bytes < ctx->max_batch_size);
- DCHECK(left_rows < ctx->max_batch_rows);
- kafka_pipe->finish();
- ctx->kafka_info->cmt_offset = std::move(cmt_offset);
- ctx->receive_bytes = ctx->max_batch_size - left_bytes;
- return Status::OK();
- }
+ kafka_pipe->finish();
+ ctx->kafka_info->cmt_offset = std::move(cmt_offset);
+ ctx->receive_bytes = ctx->max_batch_size - left_bytes;
+ return Status::OK();
}
RdKafka::Message* msg;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]