This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75b3707a28 [refactor](load) add tablet errors when close_wait return
error (#9619)
75b3707a28 is described below
commit 75b3707a28a19c20d3b79604dfa80e5803fd59a0
Author: pengxiangyu <[email protected]>
AuthorDate: Sun May 22 21:27:42 2022 +0800
[refactor](load) add tablet errors when close_wait return error (#9619)
---
be/src/olap/delta_writer.cpp | 22 ++--------------------
be/src/olap/delta_writer.h | 8 ++++----
be/src/runtime/tablets_channel.cpp | 21 ++++++++++++++++++---
be/src/runtime/tablets_channel.h | 5 +++++
be/test/olap/delta_writer_test.cpp | 12 ++++++------
.../olap/engine_storage_migration_task_test.cpp | 2 +-
6 files changed, 36 insertions(+), 34 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 613a742195..11ee242449 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -291,9 +291,7 @@ Status DeltaWriter::close() {
return Status::OK();
}
-Status
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
-
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
- bool is_broken) {
+Status DeltaWriter::close_wait() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
@@ -303,15 +301,7 @@ Status
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
}
// return error if previous flush failed
- Status s = _flush_token->wait();
- if (!s.ok()) {
-#ifndef BE_TEST
- PTabletError* tablet_error = tablet_errors->Add();
- tablet_error->set_tablet_id(_tablet->tablet_id());
- tablet_error->set_msg(s.get_error_msg());
-#endif
- return s;
- }
+ RETURN_NOT_OK(_flush_token->wait());
// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
@@ -327,14 +317,6 @@ Status
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
return res;
}
-#ifndef BE_TEST
- if (!is_broken) {
- PTabletInfo* tablet_info = tablet_vec->Add();
- tablet_info->set_tablet_id(_tablet->tablet_id());
- tablet_info->set_schema_hash(_tablet->schema_hash());
- }
-#endif
-
_delta_written_success = true;
const FlushStatistic& stat = _flush_token->get_stats();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 210c4b3eec..3d49c302d3 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,9 +67,7 @@ public:
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
- Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>*
tablet_errors,
- bool is_broken);
+ Status close_wait();
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
@@ -91,6 +89,8 @@ public:
int64_t tablet_id() { return _tablet->tablet_id(); }
+ int32_t schema_hash() { return _tablet->schema_hash(); }
+
int64_t save_mem_consumption_snapshot();
int64_t get_mem_consumption_snapshot() const;
@@ -133,4 +133,4 @@ private:
int64_t _mem_consumption_snapshot = 0;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index bc3535e88b..8010956cc4 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -129,14 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t
backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE
judge it.
- writer->close_wait(
- tablet_vec, tablet_errors,
- (_broken_tablets.find(writer->tablet_id()) !=
_broken_tablets.end()));
+ _close_wait(writer, tablet_vec, tablet_errors);
}
}
return Status::OK();
}
+void TabletsChannel::_close_wait(DeltaWriter* writer,
+
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
+ Status st = writer->close_wait();
+ if (st.ok()) {
+ if (_broken_tablets.find(writer->tablet_id()) ==
_broken_tablets.end()) {
+ PTabletInfo* tablet_info = tablet_vec->Add();
+ tablet_info->set_tablet_id(writer->tablet_id());
+ tablet_info->set_schema_hash(writer->schema_hash());
+ }
+ } else {
+ PTabletError* tablet_error = tablet_errors->Add();
+ tablet_error->set_tablet_id(writer->tablet_id());
+ tablet_error->set_msg(st.get_error_msg());
+ }
+}
+
Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 5ad955c61c..7c3bb3300c 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -97,6 +97,11 @@ private:
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);
+ // deal with DeltaWriter close_wait(), add tablet to list for return.
+ void _close_wait(DeltaWriter* writer,
+ google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
+ google::protobuf::RepeatedPtrField<PTabletError>*
tablet_error);
+
// id of this load channel
TabletsChannelKey _key;
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 2096900dbf..2326bd156a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);
@@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);
@@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) {
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
// publish version success
@@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) {
res = delta_writer->close();
ASSERT_TRUE(res.ok());
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());
// publish version success
@@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
// publish version success
@@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
res = delta_writer->close();
ASSERT_TRUE(res.ok());
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());
// publish version success
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index df5f685eff..608d4b6021 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration)
{
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(nullptr, nullptr, false);
+ res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
// publish version success
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]