This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 96acef908a6 [fix](move-memtable) check eos when close stream (#29547)
96acef908a6 is described below

commit 96acef908a66402611ef87b35f0879a3da04ee2a
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jan 4 22:56:52 2024 +0800

    [fix](move-memtable) check eos when close stream (#29547)
---
 be/src/runtime/load_stream.cpp       |  5 +++--
 be/src/runtime/load_stream.h         |  4 ++--
 be/src/vec/sink/load_stream_stub.cpp |  4 ++++
 be/src/vec/sink/load_stream_stub.h   | 14 +++++++++++++-
 gensrc/proto/internal_service.proto  |  1 +
 5 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 1bc7e7b6637..682733e3bdd 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -378,11 +378,12 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
 
 void LoadStream::_report_result(StreamId stream, const Status& status,
                                 const std::vector<int64_t>& success_tablet_ids,
-                                const FailedTablets& failed_tablets) {
+                                const FailedTablets& failed_tablets, bool eos) 
{
     LOG(INFO) << "report result " << *this << ", success tablet num " << 
success_tablet_ids.size()
               << ", failed tablet num " << failed_tablets.size();
     butil::IOBuf buf;
     PLoadStreamResponse response;
+    response.set_eos(eos);
     status.to_protobuf(response.mutable_status());
     for (auto& id : success_tablet_ids) {
         response.add_success_tablet_ids(id);
@@ -534,7 +535,7 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         FailedTablets failed_tablets;
         std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), 
hdr.tablets().end());
         auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, 
&failed_tablets);
-        _report_result(id, st, success_tablet_ids, failed_tablets);
+        _report_result(id, st, success_tablet_ids, failed_tablets, true);
         brpc::StreamClose(id);
     } break;
     case PStreamHeader::GET_SCHEMA: {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index e3bbafe01b6..ba6ad11c06a 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -135,7 +135,7 @@ private:
 
     void _report_result(StreamId stream, const Status& status,
                         const std::vector<int64_t>& success_tablet_ids,
-                        const FailedTablets& failed_tablets);
+                        const FailedTablets& failed_tablets, bool eos);
     void _report_schema(StreamId stream, const PStreamHeader& hdr);
 
     // report failure for one message
@@ -144,7 +144,7 @@ private:
         if (header.has_tablet_id()) {
             failed_tablets.emplace_back(header.tablet_id(), status);
         }
-        _report_result(stream, status, {}, failed_tablets);
+        _report_result(stream, status, {}, failed_tablets, false);
     }
 
 private:
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 939c88e0b65..fdd9762330b 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -36,6 +36,10 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
         PLoadStreamResponse response;
         response.ParseFromZeroCopyStream(&wrapper);
 
+        if (response.eos()) {
+            _is_eos.store(true);
+        }
+
         Status st = Status::create(response.status());
 
         std::stringstream ss;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index a5c9d7464a4..f7d6844fe6b 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -99,6 +99,8 @@ private:
 
         bool is_closed() { return _is_closed.load(); }
 
+        bool is_eos() { return _is_eos.load(); }
+
         Status close_wait(int64_t timeout_ms) {
             DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
             std::unique_lock<bthread::Mutex> lock(_mutex);
@@ -106,7 +108,16 @@ private:
                 return Status::OK();
             }
             int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
-            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream 
close_wait timeout");
+            if (ret != 0) {
+                return Status::InternalError(
+                        "stream close_wait timeout, load_id={}, be_id={}, 
error={}",
+                        _load_id.to_string(), _dst_id, ret);
+            }
+            if (!_is_eos.load()) {
+                return Status::InternalError("stream closed without eos, 
load_id={} be_id={}",
+                                             _load_id.to_string(), _dst_id);
+            }
+            return Status::OK();
         };
 
         std::vector<int64_t> success_tablets() {
@@ -126,6 +137,7 @@ private:
         UniqueId _load_id;    // for logging
         int64_t _dst_id = -1; // for logging
         std::atomic<bool> _is_closed;
+        std::atomic<bool> _is_eos;
         bthread::Mutex _mutex;
         bthread::ConditionVariable _close_cv;
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index cefcd073b84..351bf5caf82 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -778,6 +778,7 @@ message PLoadStreamResponse {
     repeated PFailedTablet failed_tablets = 3;
     optional bytes load_stream_profile = 4;
     repeated PTabletSchemaWithIndex tablet_schemas = 5;
+    optional bool eos = 6;
 }
 
 message PStreamHeader {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to