This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4e86f9bab5f [improve](move-memtable) include and check offset when
append data (#28159)
4e86f9bab5f is described below
commit 4e86f9bab5fa561c6922d58343abe28578a72b52
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Dec 9 16:21:36 2023 +0800
[improve](move-memtable) include and check offset when append data (#28159)
---
be/src/io/fs/stream_sink_file_writer.cpp | 10 ++--
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/load_stream_writer.cpp | 71 ++++++++++++++++++++------
be/src/runtime/load_stream_writer.h | 2 +-
be/src/vec/sink/load_stream_stub.cpp | 3 +-
be/src/vec/sink/load_stream_stub.h | 3 +-
be/test/io/fs/stream_sink_file_writer_test.cpp | 4 +-
be/test/runtime/load_stream_test.cpp | 47 ++++++++---------
gensrc/proto/internal_service.proto | 1 +
9 files changed, 94 insertions(+), 49 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 9b2125c9446..484be9e07e9 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -44,7 +44,6 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
for (int i = 0; i < data_cnt; i++) {
bytes_req += data[i].get_size();
}
- _bytes_appended += bytes_req;
VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id
@@ -52,9 +51,10 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
std::span<const Slice> slices {data, data_cnt};
for (auto& stream : _streams) {
- RETURN_IF_ERROR(
- stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id, slices));
+ RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id,
_tablet_id, _segment_id,
+ _bytes_appended, slices));
}
+ _bytes_appended += bytes_req;
return Status::OK();
}
@@ -63,8 +63,8 @@ Status StreamSinkFileWriter::finalize() {
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
for (auto& stream : _streams) {
- RETURN_IF_ERROR(
- stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id, {}, true));
+ RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id,
_tablet_id, _segment_id,
+ _bytes_appended, {}, true));
}
return Status::OK();
}
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 80d82956a0f..9d05d48f54c 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -127,7 +127,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header]() {
- auto st = _load_stream_writer->append_data(new_segid, buf);
+ auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf);
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
}
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 427ace47d00..0a339e854aa 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -83,14 +83,14 @@ Status LoadStreamWriter::init() {
return Status::OK();
}
-Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) {
+Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset,
butil::IOBuf buf) {
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
if (!_is_init) {
RETURN_IF_ERROR(init());
}
- if (segid + 1 > _segment_file_writers.size()) {
+ if (segid >= _segment_file_writers.size()) {
for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
Status st;
io::FileWriterPtr file_writer;
@@ -107,32 +107,70 @@ Status LoadStreamWriter::append_data(uint32_t segid,
butil::IOBuf buf) {
file_writer = _segment_file_writers[segid].get();
}
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
+ if (file_writer == nullptr) {
+ return Status::Corruption("append_data failed, file writer {} is
destoryed", segid);
+ }
+ if (file_writer->bytes_appended() != offset) {
+ return Status::Corruption(
+ "append_data out-of-order in segment={}, expected offset={},
actual={}",
+ file_writer->path().native(), offset,
file_writer->bytes_appended());
+ }
return file_writer->append(buf.to_string());
}
Status LoadStreamWriter::close_segment(uint32_t segid) {
- auto st = _segment_file_writers[segid]->close();
+ io::FileWriter* file_writer = nullptr;
+ {
+ std::lock_guard lock_guard(_lock);
+ if (!_is_init) {
+ return Status::Corruption("close_segment failed, LoadStreamWriter
is not inited");
+ }
+ if (segid >= _segment_file_writers.size()) {
+ return Status::Corruption("close_segment failed, segment {} is
never opened", segid);
+ }
+ file_writer = _segment_file_writers[segid].get();
+ }
+ if (file_writer == nullptr) {
+ return Status::Corruption("close_segment failed, file writer {} is
destoryed", segid);
+ }
+ auto st = file_writer->close();
if (!st.ok()) {
_is_canceled = true;
return st;
}
- if (_segment_file_writers[segid]->bytes_appended() == 0) {
- return Status::Corruption("segment {} is zero bytes", segid);
+ LOG(INFO) << "segment " << segid << " path " <<
file_writer->path().native()
+ << "closed, written " << file_writer->bytes_appended() << "
bytes";
+ if (file_writer->bytes_appended() == 0) {
+ return Status::Corruption("segment {} closed with 0 bytes",
file_writer->path().native());
}
- LOG(INFO) << "segid " << segid << "path " <<
_segment_file_writers[segid]->path() << " written "
- << _segment_file_writers[segid]->bytes_appended() << " bytes";
return Status::OK();
}
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics&
stat,
TabletSchemaSPtr flush_schema) {
- if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
- LOG(WARNING) << _segment_file_writers[segid]->path() << " is
incomplete, actual size: "
- << _segment_file_writers[segid]->bytes_appended()
- << ", expected size: " << stat.data_size;
- return Status::Corruption("segment {} is incomplete, actual size: {},
expected size: {}",
-
_segment_file_writers[segid]->path().native(),
-
_segment_file_writers[segid]->bytes_appended(), stat.data_size);
+ io::FileWriter* file_writer = nullptr;
+ {
+ std::lock_guard lock_guard(_lock);
+ if (!_is_init) {
+ return Status::Corruption("add_segment failed, LoadStreamWriter is
not inited");
+ }
+ if (segid >= _segment_file_writers.size()) {
+ return Status::Corruption("add_segment failed, segment {} is never
opened", segid);
+ }
+ file_writer = _segment_file_writers[segid].get();
+ }
+ if (file_writer == nullptr) {
+ return Status::Corruption("add_segment failed, file writer {} is
destoryed", segid);
+ }
+ if (!file_writer->is_closed()) {
+ return Status::Corruption("add_segment failed, segment {} is not
closed",
+ file_writer->path().native());
+ }
+ if (file_writer->bytes_appended() != stat.data_size) {
+ return Status::Corruption(
+ "add_segment failed, segment stat {} does not match, file
size={}, "
+ "stat.data_size={}",
+ file_writer->path().native(), file_writer->bytes_appended(),
stat.data_size);
}
return _rowset_writer->add_segment(segid, stat, flush_schema);
}
@@ -152,12 +190,13 @@ Status LoadStreamWriter::close() {
<< "rowset builder is supposed be to initialized before
close_wait() being called";
if (_is_canceled) {
- return Status::Error<ErrorCode::INTERNAL_ERROR>("flush segment
failed");
+ return Status::InternalError("flush segment failed");
}
for (const auto& writer : _segment_file_writers) {
if (!writer->is_closed()) {
- return Status::Corruption("segment {} is not closed",
writer->path().native());
+ return Status::Corruption("LoadStreamWriter close failed, segment
{} is not closed",
+ writer->path().native());
}
}
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index e038ceeb89b..ab6530bf60d 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -61,7 +61,7 @@ public:
Status init();
- Status append_data(uint32_t segid, butil::IOBuf buf);
+ Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf);
Status close_segment(uint32_t segid);
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index dd7a4a8ceca..535e941b3b6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -185,7 +185,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
// APPEND_DATA
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
- int64_t segment_id, std::span<const Slice>
data,
+ int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
bool segment_eos) {
PStreamHeader header;
header.set_src_id(_src_id);
@@ -195,6 +195,7 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
header.set_tablet_id(tablet_id);
header.set_segment_id(segment_id);
header.set_segment_eos(segment_eos);
+ header.set_offset(offset);
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
return _encode_and_send(header, data);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 786de57d759..edbbbda1e64 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -162,7 +162,8 @@ public:
// APPEND_DATA
Status
append_data(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
- int64_t segment_id, std::span<const Slice> data, bool
segment_eos = false);
+ int64_t segment_id, uint64_t offset, std::span<const
Slice> data,
+ bool segment_eos = false);
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index c52b59f01e4..7e5bdd350f5 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -57,7 +57,7 @@ class StreamSinkFileWriterTest : public testing::Test {
// APPEND_DATA
virtual Status append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
- int64_t segment_id, std::span<const Slice>
data,
+ int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
bool segment_eos = false) override {
EXPECT_EQ(PARTITION_ID, partition_id);
EXPECT_EQ(INDEX_ID, index_id);
@@ -65,10 +65,12 @@ class StreamSinkFileWriterTest : public testing::Test {
EXPECT_EQ(SEGMENT_ID, segment_id);
if (segment_eos) {
EXPECT_EQ(0, data.size());
+ EXPECT_EQ(DATA0.length() + DATA1.length(), offset);
} else {
EXPECT_EQ(2, data.size());
EXPECT_EQ(DATA0, data[0].to_string());
EXPECT_EQ(DATA1, data[1].to_string());
+ EXPECT_EQ(0, offset);
}
g_num_request++;
return Status::OK();
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index 247f9c6b6b5..b1ad0826177 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -513,8 +513,8 @@ public:
}
void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t
sender_id,
- int64_t index_id, int64_t tablet_id, uint32_t segid,
std::string& data,
- bool segment_eos) {
+ int64_t index_id, int64_t tablet_id, uint32_t segid,
uint64_t offset,
+ std::string& data, bool segment_eos) {
// append data
butil::IOBuf append_buf;
PStreamHeader header;
@@ -527,6 +527,7 @@ public:
header.set_segment_eos(segment_eos);
header.set_src_id(sender_id);
header.set_partition_id(NORMAL_PARTITION_ID);
+ header.set_offset(offset);
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
@@ -539,27 +540,27 @@ public:
void write_normal(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
+ NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true);
}
void write_abnormal_load(MockSinkClient& client) {
write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+ NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_index(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
ABNORMAL_INDEX_ID,
- NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+ NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_sender(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+ NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_tablet(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID,
- ABNORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+ ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void wait_for_ack(int32_t num) {
@@ -710,7 +711,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
@@ -820,7 +821,7 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment0_zero_b
PStreamHeader header;
std::string data;
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- data, true);
+ 0, data, true);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -861,9 +862,9 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment0) {
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- data, false);
+ 0, data, false);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- data, true);
+ data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -907,7 +908,7 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment_without
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- data, false);
+ 0, data, false);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -948,9 +949,9 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment1) {
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
- data, false);
+ 0, data, false);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
- data, true);
+ data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -991,13 +992,13 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_two_segment) {
PStreamHeader header;
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- data1, false);
+ 0, data1, false);
std::string empty;
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
- empty, true);
+ data1.length(), empty, true);
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
- data2, true);
+ 0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -1044,12 +1045,12 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_three_tablet) {
PStreamHeader header;
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
- NORMAL_TABLET_ID + 0, 0, data1, true);
+ NORMAL_TABLET_ID + 0, 0, 0, data1, true);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
- NORMAL_TABLET_ID + 1, 0, data1, true);
+ NORMAL_TABLET_ID + 1, 0, 0, data1, true);
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
- NORMAL_TABLET_ID + 2, 0, data2, true);
+ NORMAL_TABLET_ID + 2, 0, 0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
@@ -1113,7 +1114,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
std::string data1 =
"sender_id=" + std::to_string(i) + ",segid=" +
std::to_string(segid);
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, segid, data1, true);
+ NORMAL_TABLET_ID, segid, 0, data1, true);
segment_data[i * 3 + segid] = data1;
LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1;
}
@@ -1186,7 +1187,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_close_before_the_other_open) {
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 0;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid],
true);
+ NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 +
segid], true);
}
EXPECT_EQ(g_response_stat.num, 0);
@@ -1205,7 +1206,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_close_before_the_other_open) {
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 1;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i,
NORMAL_INDEX_ID,
- NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid],
true);
+ NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 +
segid], true);
}
close_load(clients[1], 1);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 3676d854a94..ec3714d618a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -791,6 +791,7 @@ message PStreamHeader {
optional SegmentStatisticsPB segment_statistics = 9;
repeated PTabletID tablets = 10;
optional TabletSchemaPB flush_schema = 11;
+ optional uint64 offset = 12;
}
message PGetWalQueueSizeRequest{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]