This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 0e38fa4e9d4b18746987560d84f8b44c59501fa0 Author: pengxiangyu <[email protected]> AuthorDate: Sun May 22 21:27:42 2022 +0800 [refactor](load) add tablet errors when close_wait return error (#9619) --- be/src/olap/delta_writer.cpp | 21 +--- be/src/olap/delta_writer.h | 6 +- be/src/runtime/tablets_channel.cpp | 21 +++- be/src/runtime/tablets_channel.h | 6 +- be/test/olap/delta_writer_test.cpp | 246 ++++++++++++++++++++++++++++++++++++- 5 files changed, 271 insertions(+), 29 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 198019561f..58b89e5d64 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -279,9 +279,7 @@ OLAPStatus DeltaWriter::close() { return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, - bool is_broken) { +OLAPStatus DeltaWriter::close_wait() { std::lock_guard<std::mutex> l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; @@ -291,14 +289,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf } // return error if previous flush failed - OLAPStatus st = _flush_token->wait(); - if (st != OLAP_SUCCESS) { - PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(_tablet->tablet_id()); - tablet_error->set_msg("flush failed"); - return st; - } - DCHECK_EQ(_mem_tracker->consumption(), 0); + RETURN_NOT_OK(_flush_token->wait()); // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); @@ -314,14 +305,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf return res; } -#ifndef BE_TEST - if (!is_broken) { - PTabletInfo* tablet_info = tablet_vec->Add(); - tablet_info->set_tablet_id(_tablet->tablet_id()); - tablet_info->set_schema_hash(_tablet->schema_hash()); - } -#endif - _delta_written_success = true; const FlushStatistic& stat = _flush_token->get_stats(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index cf5a2729d2..70164e56c8 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -67,9 +67,7 @@ public: OLAPStatus close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, - bool is_broken); + OLAPStatus close_wait(); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. @@ -91,6 +89,8 @@ public: int64_t tablet_id() { return _tablet->tablet_id(); } + int32_t schema_hash() { return _tablet->schema_hash(); } + int64_t save_mem_consumption_snapshot(); int64_t get_mem_consumption_snapshot() const; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 926cba7aaa..19f342360c 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -198,9 +198,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, for (auto writer : need_wait_writers) { // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - writer->close_wait( - tablet_vec, tablet_errors, - (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end())); + _close_wait(writer, tablet_vec, tablet_errors); } // TODO(gaodayue) clear and destruct all delta writers to make sure all memory are freed // DCHECK_EQ(_mem_tracker->consumption(), 0); @@ -208,6 +206,23 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, return Status::OK(); } +void TabletsChannel::_close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, + google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) { + OLAPStatus st = writer->close_wait(); + if (st != OLAP_SUCCESS) { + if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) { + PTabletInfo* tablet_info = tablet_vec->Add(); + tablet_info->set_tablet_id(writer->tablet_id()); + tablet_info->set_schema_hash(writer->schema_hash()); + } + } else { + PTabletError* tablet_error = tablet_errors->Add(); + tablet_error->set_tablet_id(writer->tablet_id()); + tablet_error->set_msg("close wait failed: " + st); + } +} + Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { std::lock_guard<std::mutex> l(_lock); if (_state == kFinished) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 360242ae88..cd5ee67867 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -89,7 +89,11 @@ private: // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); -private: + // deal with DeltaWriter close_wait(), add tablet to list for return. + void _close_wait(DeltaWriter* writer, + google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, + google::protobuf::RepeatedPtrField<PTabletError>* tablet_error); + // id of this load channel TabletsChannelKey _key; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 1e2cab2e1e..e51dbe9605 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -371,9 +371,18 @@ TEST_F(TestDeltaWriter, open) { DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); ASSERT_NE(delta_writer, nullptr); res = delta_writer->close(); - ASSERT_EQ(OLAP_SUCCESS, res); - res = delta_writer->close_wait(nullptr, false); - ASSERT_EQ(OLAP_SUCCESS, res); + EXPECT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(); + EXPECT_EQ(OLAP_SUCCESS, res); + SAFE_DELETE(delta_writer); + + // test vec delta writer + DeltaWriter::open(&write_req, &delta_writer, true); + EXPECT_NE(delta_writer, nullptr); + res = delta_writer->close(); + EXPECT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(); + EXPECT_EQ(OLAP_SUCCESS, res); SAFE_DELETE(delta_writer); TDropTabletReq drop_request; @@ -470,9 +479,149 @@ TEST_F(TestDeltaWriter, write) { } res = delta_writer->close(); +<<<<<<< HEAD ASSERT_EQ(OLAP_SUCCESS, res); res = delta_writer->close_wait(nullptr, false); ASSERT_EQ(OLAP_SUCCESS, res); +======= + EXPECT_EQ(Status::OK(), res); + res = delta_writer->close_wait(); + EXPECT_EQ(Status::OK(), res); + + // publish version success + TabletSharedPtr tablet = + k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + OlapMeta* meta = tablet->data_dir()->get_meta(); + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + for (auto& tablet_rs : tablet_related_rs) { + RowsetSharedPtr rowset = tablet_rs.second; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_rs.first.tablet_uid, version); + EXPECT_EQ(Status::OK(), res); + res = tablet->add_inc_rowset(rowset); + EXPECT_EQ(Status::OK(), res); + } + EXPECT_EQ(1, tablet->num_rows()); + + auto tablet_id = 10003; + auto schema_hash = 270068375; + res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + EXPECT_EQ(Status::OK(), res); + delete delta_writer; +} + +TEST_F(TestDeltaWriter, vec_write) { + TCreateTabletReq request; + create_tablet_request(10004, 270068376, &request); + Status res = k_engine->create_tablet(request); + ASSERT_TRUE(res.ok()); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + // const std::vector<SlotDescriptor*>& slots = tuple_desc->slots(); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002, + 30002, load_id, tuple_desc, &(tuple_desc->slots())}; + DeltaWriter* delta_writer = nullptr; + DeltaWriter::open(&write_req, &delta_writer, true); + ASSERT_NE(delta_writer, nullptr); + + auto tracker = std::make_shared<MemTracker>(); + MemPool pool(tracker.get()); + + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + auto columns = block.mutate_columns(); + { + int8_t k1 = -127; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + + int16_t k2 = -32767; + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + + int32_t k3 = -2147483647; + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + + int64_t k4 = -9223372036854775807L; + columns[3]->insert_data((const char*)&k4, sizeof(k4)); + + int128_t k5 = -90000; + columns[4]->insert_data((const char*)&k5, sizeof(k5)); + + DateTimeValue k6; + k6.from_date_str("2048-11-10", 10); + auto k6_int = k6.to_int64(); + columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int)); + + DateTimeValue k7; + k7.from_date_str("2636-08-16 19:39:43", 19); + auto k7_int = k7.to_int64(); + columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int)); + + columns[7]->insert_data("abcd", 4); + columns[8]->insert_data("abcde", 5); + + DecimalV2Value decimal_value; + decimal_value.assign_from_double(1.1); + columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value)); + + int8_t v1 = -127; + columns[10]->insert_data((const char*)&v1, sizeof(v1)); + + int16_t v2 = -32767; + columns[11]->insert_data((const char*)&v2, sizeof(v2)); + + int32_t v3 = -2147483647; + columns[12]->insert_data((const char*)&v3, sizeof(v3)); + + int64_t v4 = -9223372036854775807L; + columns[13]->insert_data((const char*)&v4, sizeof(v4)); + + int128_t v5 = -90000; + columns[14]->insert_data((const char*)&v5, sizeof(v5)); + + DateTimeValue v6; + v6.from_date_str("2048-11-10", 10); + auto v6_int = v6.to_int64(); + columns[15]->insert_data((const char*)&v6_int, sizeof(v6_int)); + + DateTimeValue v7; + v7.from_date_str("2636-08-16 19:39:43", 19); + auto v7_int = v7.to_int64(); + columns[16]->insert_data((const char*)&v7_int, sizeof(v7_int)); + + columns[17]->insert_data("abcd", 4); + columns[18]->insert_data("abcde", 5); + + decimal_value.assign_from_double(1.1); + columns[19]->insert_data((const char*)&decimal_value, sizeof(decimal_value)); + + res = delta_writer->write(&block, {0}); + ASSERT_TRUE(res.ok()); + } + + res = delta_writer->close(); + ASSERT_TRUE(res.ok()); + res = delta_writer->close_wait(); + ASSERT_TRUE(res.ok()); +>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619)) // publish version success TabletSharedPtr tablet = @@ -548,9 +697,100 @@ TEST_F(TestDeltaWriter, sequence_col) { } res = delta_writer->close(); +<<<<<<< HEAD ASSERT_EQ(OLAP_SUCCESS, res); res = delta_writer->close_wait(nullptr, false); ASSERT_EQ(OLAP_SUCCESS, res); +======= + EXPECT_EQ(Status::OK(), res); + res = delta_writer->close_wait(); + EXPECT_EQ(Status::OK(), res); + + // publish version success + TabletSharedPtr tablet = + k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + OlapMeta* meta = tablet->data_dir()->get_meta(); + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + for (auto& tablet_rs : tablet_related_rs) { + RowsetSharedPtr rowset = tablet_rs.second; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_rs.first.tablet_uid, version); + EXPECT_EQ(Status::OK(), res); + res = tablet->add_inc_rowset(rowset); + EXPECT_EQ(Status::OK(), res); + } + EXPECT_EQ(1, tablet->num_rows()); + + auto tablet_id = 10005; + auto schema_hash = 270068377; + res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + EXPECT_EQ(Status::OK(), res); + delete delta_writer; +} + +TEST_F(TestDeltaWriter, vec_sequence_col) { + TCreateTabletReq request; + sleep(20); + create_tablet_request_with_sequence_col(10005, 270068377, &request); + Status res = k_engine->create_tablet(request); + ASSERT_TRUE(res.ok()); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, + 30003, load_id, tuple_desc, &(tuple_desc->slots())}; + DeltaWriter* delta_writer = nullptr; + DeltaWriter::open(&write_req, &delta_writer, true); + ASSERT_NE(delta_writer, nullptr); + + MemTracker tracker; + MemPool pool(&tracker); + + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + auto columns = block.mutate_columns(); + { + int8_t c1 = 123; + columns[0]->insert_data((const char*)&c1, sizeof(c1)); + + int16_t c2 = 456; + columns[1]->insert_data((const char*)&c2, sizeof(c2)); + + int32_t c3 = 1; + columns[2]->insert_data((const char*)&c3, sizeof(c2)); + + DateTimeValue c4; + c4.from_date_str("2020-07-16 19:39:43", 19); + int64_t c4_int = c4.to_int64(); + columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); + + res = delta_writer->write(&block, {0}); + ASSERT_TRUE(res.ok()); + } + + res = delta_writer->close(); + ASSERT_TRUE(res.ok()); + res = delta_writer->close_wait(); + ASSERT_TRUE(res.ok()); +>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619)) // publish version success TabletSharedPtr tablet = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
