This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75b3707a28 [refactor](load) add tablet errors when close_wait return 
error (#9619)
75b3707a28 is described below

commit 75b3707a28a19c20d3b79604dfa80e5803fd59a0
Author: pengxiangyu <[email protected]>
AuthorDate: Sun May 22 21:27:42 2022 +0800

    [refactor](load) add tablet errors when close_wait return error (#9619)
---
 be/src/olap/delta_writer.cpp                       | 22 ++--------------------
 be/src/olap/delta_writer.h                         |  8 ++++----
 be/src/runtime/tablets_channel.cpp                 | 21 ++++++++++++++++++---
 be/src/runtime/tablets_channel.h                   |  5 +++++
 be/test/olap/delta_writer_test.cpp                 | 12 ++++++------
 .../olap/engine_storage_migration_task_test.cpp    |  2 +-
 6 files changed, 36 insertions(+), 34 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 613a742195..11ee242449 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -291,9 +291,7 @@ Status DeltaWriter::close() {
     return Status::OK();
 }
 
-Status 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                               
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                               bool is_broken) {
+Status DeltaWriter::close_wait() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
@@ -303,15 +301,7 @@ Status 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
     }
 
     // return error if previous flush failed
-    Status s = _flush_token->wait();
-    if (!s.ok()) {
-#ifndef BE_TEST
-        PTabletError* tablet_error = tablet_errors->Add();
-        tablet_error->set_tablet_id(_tablet->tablet_id());
-        tablet_error->set_msg(s.get_error_msg());
-#endif
-        return s;
-    }
+    RETURN_NOT_OK(_flush_token->wait());
 
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
@@ -327,14 +317,6 @@ Status 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
         return res;
     }
 
-#ifndef BE_TEST
-    if (!is_broken) {
-        PTabletInfo* tablet_info = tablet_vec->Add();
-        tablet_info->set_tablet_id(_tablet->tablet_id());
-        tablet_info->set_schema_hash(_tablet->schema_hash());
-    }
-#endif
-
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 210c4b3eec..3d49c302d3 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,9 +67,7 @@ public:
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                      google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors,
-                      bool is_broken);
+    Status close_wait();
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -91,6 +89,8 @@ public:
 
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
+    int32_t schema_hash() { return _tablet->schema_hash(); }
+
     int64_t save_mem_consumption_snapshot();
 
     int64_t get_mem_consumption_snapshot() const;
@@ -133,4 +133,4 @@ private:
     int64_t _mem_consumption_snapshot = 0;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index bc3535e88b..8010956cc4 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -129,14 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
         for (auto writer : need_wait_writers) {
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE 
judge it.
-            writer->close_wait(
-                    tablet_vec, tablet_errors,
-                    (_broken_tablets.find(writer->tablet_id()) != 
_broken_tablets.end()));
+            _close_wait(writer, tablet_vec, tablet_errors);
         }
     }
     return Status::OK();
 }
 
+void TabletsChannel::_close_wait(DeltaWriter* writer,
+                                 
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                                 
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
+    Status st = writer->close_wait();
+    if (st.ok()) {
+        if (_broken_tablets.find(writer->tablet_id()) == 
_broken_tablets.end()) {
+            PTabletInfo* tablet_info = tablet_vec->Add();
+            tablet_info->set_tablet_id(writer->tablet_id());
+            tablet_info->set_schema_hash(writer->schema_hash());
+        }
+    } else {
+        PTabletError* tablet_error = tablet_errors->Add();
+        tablet_error->set_tablet_id(writer->tablet_id());
+        tablet_error->set_msg(st.get_error_msg());
+    }
+}
+
 Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 5ad955c61c..7c3bb3300c 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -97,6 +97,11 @@ private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
+    // deal with DeltaWriter close_wait(), add tablet to list for return.
+    void _close_wait(DeltaWriter* writer,
+                     google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
+                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error);
+
     // id of this load channel
     TabletsChannelKey _key;
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 2096900dbf..2326bd156a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
@@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
@@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) {
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
@@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) {
 
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
@@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
 
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     ASSERT_TRUE(res.ok());
 
     // publish version success
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index df5f685eff..608d4b6021 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(nullptr, nullptr, false);
+    res = delta_writer->close_wait();
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to