This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 96acef908a6 [fix](move-memtable) check eos when close stream (#29547)
96acef908a6 is described below
commit 96acef908a66402611ef87b35f0879a3da04ee2a
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jan 4 22:56:52 2024 +0800
[fix](move-memtable) check eos when close stream (#29547)
---
be/src/runtime/load_stream.cpp | 5 +++--
be/src/runtime/load_stream.h | 4 ++--
be/src/vec/sink/load_stream_stub.cpp | 4 ++++
be/src/vec/sink/load_stream_stub.h | 14 +++++++++++++-
gensrc/proto/internal_service.proto | 1 +
5 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 1bc7e7b6637..682733e3bdd 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -378,11 +378,12 @@ Status LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_t
void LoadStream::_report_result(StreamId stream, const Status& status,
const std::vector<int64_t>& success_tablet_ids,
- const FailedTablets& failed_tablets) {
+ const FailedTablets& failed_tablets, bool eos)
{
LOG(INFO) << "report result " << *this << ", success tablet num " <<
success_tablet_ids.size()
<< ", failed tablet num " << failed_tablets.size();
butil::IOBuf buf;
PLoadStreamResponse response;
+ response.set_eos(eos);
status.to_protobuf(response.mutable_status());
for (auto& id : success_tablet_ids) {
response.add_success_tablet_ids(id);
@@ -534,7 +535,7 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(),
hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids,
&failed_tablets);
- _report_result(id, st, success_tablet_ids, failed_tablets);
+ _report_result(id, st, success_tablet_ids, failed_tablets, true);
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index e3bbafe01b6..ba6ad11c06a 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -135,7 +135,7 @@ private:
void _report_result(StreamId stream, const Status& status,
const std::vector<int64_t>& success_tablet_ids,
- const FailedTablets& failed_tablets);
+ const FailedTablets& failed_tablets, bool eos);
void _report_schema(StreamId stream, const PStreamHeader& hdr);
// report failure for one message
@@ -144,7 +144,7 @@ private:
if (header.has_tablet_id()) {
failed_tablets.emplace_back(header.tablet_id(), status);
}
- _report_result(stream, status, {}, failed_tablets);
+ _report_result(stream, status, {}, failed_tablets, false);
}
private:
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 939c88e0b65..fdd9762330b 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -36,6 +36,10 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
PLoadStreamResponse response;
response.ParseFromZeroCopyStream(&wrapper);
+ if (response.eos()) {
+ _is_eos.store(true);
+ }
+
Status st = Status::create(response.status());
std::stringstream ss;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index a5c9d7464a4..f7d6844fe6b 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -99,6 +99,8 @@ private:
bool is_closed() { return _is_closed.load(); }
+ bool is_eos() { return _is_eos.load(); }
+
Status close_wait(int64_t timeout_ms) {
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_mutex);
@@ -106,7 +108,16 @@ private:
return Status::OK();
}
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
- return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream
close_wait timeout");
+ if (ret != 0) {
+ return Status::InternalError(
+ "stream close_wait timeout, load_id={}, be_id={},
error={}",
+ _load_id.to_string(), _dst_id, ret);
+ }
+ if (!_is_eos.load()) {
+ return Status::InternalError("stream closed without eos,
load_id={} be_id={}",
+ _load_id.to_string(), _dst_id);
+ }
+ return Status::OK();
};
std::vector<int64_t> success_tablets() {
@@ -126,6 +137,7 @@ private:
UniqueId _load_id; // for logging
int64_t _dst_id = -1; // for logging
std::atomic<bool> _is_closed;
+ std::atomic<bool> _is_eos;
bthread::Mutex _mutex;
bthread::ConditionVariable _close_cv;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index cefcd073b84..351bf5caf82 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -778,6 +778,7 @@ message PLoadStreamResponse {
repeated PFailedTablet failed_tablets = 3;
optional bytes load_stream_profile = 4;
repeated PTabletSchemaWithIndex tablet_schemas = 5;
+ optional bool eos = 6;
}
message PStreamHeader {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]