This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e86f9bab5f [improve](move-memtable) include and check offset when 
append data (#28159)
4e86f9bab5f is described below

commit 4e86f9bab5fa561c6922d58343abe28578a72b52
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Dec 9 16:21:36 2023 +0800

    [improve](move-memtable) include and check offset when append data (#28159)
---
 be/src/io/fs/stream_sink_file_writer.cpp       | 10 ++--
 be/src/runtime/load_stream.cpp                 |  2 +-
 be/src/runtime/load_stream_writer.cpp          | 71 ++++++++++++++++++++------
 be/src/runtime/load_stream_writer.h            |  2 +-
 be/src/vec/sink/load_stream_stub.cpp           |  3 +-
 be/src/vec/sink/load_stream_stub.h             |  3 +-
 be/test/io/fs/stream_sink_file_writer_test.cpp |  4 +-
 be/test/runtime/load_stream_test.cpp           | 47 ++++++++---------
 gensrc/proto/internal_service.proto            |  1 +
 9 files changed, 94 insertions(+), 49 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 9b2125c9446..484be9e07e9 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -44,7 +44,6 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
     for (int i = 0; i < data_cnt; i++) {
         bytes_req += data[i].get_size();
     }
-    _bytes_appended += bytes_req;
 
     VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id
@@ -52,9 +51,10 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
 
     std::span<const Slice> slices {data, data_cnt};
     for (auto& stream : _streams) {
-        RETURN_IF_ERROR(
-                stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, slices));
+        RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, 
_tablet_id, _segment_id,
+                                            _bytes_appended, slices));
     }
+    _bytes_appended += bytes_req;
     return Status::OK();
 }
 
@@ -63,8 +63,8 @@ Status StreamSinkFileWriter::finalize() {
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
     for (auto& stream : _streams) {
-        RETURN_IF_ERROR(
-                stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, {}, true));
+        RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, 
_tablet_id, _segment_id,
+                                            _bytes_appended, {}, true));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 80d82956a0f..9d05d48f54c 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -127,7 +127,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
     auto flush_func = [this, new_segid, eos, buf, header]() {
-        auto st = _load_stream_writer->append_data(new_segid, buf);
+        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf);
         if (eos && st.ok()) {
             st = _load_stream_writer->close_segment(new_segid);
         }
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 427ace47d00..0a339e854aa 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -83,14 +83,14 @@ Status LoadStreamWriter::init() {
     return Status::OK();
 }
 
-Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) {
+Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, 
butil::IOBuf buf) {
     io::FileWriter* file_writer = nullptr;
     {
         std::lock_guard lock_guard(_lock);
         if (!_is_init) {
             RETURN_IF_ERROR(init());
         }
-        if (segid + 1 > _segment_file_writers.size()) {
+        if (segid >= _segment_file_writers.size()) {
             for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
                 Status st;
                 io::FileWriterPtr file_writer;
@@ -107,32 +107,70 @@ Status LoadStreamWriter::append_data(uint32_t segid, 
butil::IOBuf buf) {
         file_writer = _segment_file_writers[segid].get();
     }
     VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
+    if (file_writer == nullptr) {
+        return Status::Corruption("append_data failed, file writer {} is 
destoryed", segid);
+    }
+    if (file_writer->bytes_appended() != offset) {
+        return Status::Corruption(
+                "append_data out-of-order in segment={}, expected offset={}, 
actual={}",
+                file_writer->path().native(), offset, 
file_writer->bytes_appended());
+    }
     return file_writer->append(buf.to_string());
 }
 
 Status LoadStreamWriter::close_segment(uint32_t segid) {
-    auto st = _segment_file_writers[segid]->close();
+    io::FileWriter* file_writer = nullptr;
+    {
+        std::lock_guard lock_guard(_lock);
+        if (!_is_init) {
+            return Status::Corruption("close_segment failed, LoadStreamWriter 
is not inited");
+        }
+        if (segid >= _segment_file_writers.size()) {
+            return Status::Corruption("close_segment failed, segment {} is 
never opened", segid);
+        }
+        file_writer = _segment_file_writers[segid].get();
+    }
+    if (file_writer == nullptr) {
+        return Status::Corruption("close_segment failed, file writer {} is 
destoryed", segid);
+    }
+    auto st = file_writer->close();
     if (!st.ok()) {
         _is_canceled = true;
         return st;
     }
-    if (_segment_file_writers[segid]->bytes_appended() == 0) {
-        return Status::Corruption("segment {} is zero bytes", segid);
+    LOG(INFO) << "segment " << segid << " path " << 
file_writer->path().native()
+              << "closed, written " << file_writer->bytes_appended() << " 
bytes";
+    if (file_writer->bytes_appended() == 0) {
+        return Status::Corruption("segment {} closed with 0 bytes", 
file_writer->path().native());
     }
-    LOG(INFO) << "segid " << segid << "path " << 
_segment_file_writers[segid]->path() << " written "
-              << _segment_file_writers[segid]->bytes_appended() << " bytes";
     return Status::OK();
 }
 
 Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& 
stat,
                                      TabletSchemaSPtr flush_schema) {
-    if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
-        LOG(WARNING) << _segment_file_writers[segid]->path() << " is 
incomplete, actual size: "
-                     << _segment_file_writers[segid]->bytes_appended()
-                     << ", expected size: " << stat.data_size;
-        return Status::Corruption("segment {} is incomplete, actual size: {}, 
expected size: {}",
-                                  
_segment_file_writers[segid]->path().native(),
-                                  
_segment_file_writers[segid]->bytes_appended(), stat.data_size);
+    io::FileWriter* file_writer = nullptr;
+    {
+        std::lock_guard lock_guard(_lock);
+        if (!_is_init) {
+            return Status::Corruption("add_segment failed, LoadStreamWriter is 
not inited");
+        }
+        if (segid >= _segment_file_writers.size()) {
+            return Status::Corruption("add_segment failed, segment {} is never 
opened", segid);
+        }
+        file_writer = _segment_file_writers[segid].get();
+    }
+    if (file_writer == nullptr) {
+        return Status::Corruption("add_segment failed, file writer {} is 
destoryed", segid);
+    }
+    if (!file_writer->is_closed()) {
+        return Status::Corruption("add_segment failed, segment {} is not 
closed",
+                                  file_writer->path().native());
+    }
+    if (file_writer->bytes_appended() != stat.data_size) {
+        return Status::Corruption(
+                "add_segment failed, segment stat {} does not match, file 
size={}, "
+                "stat.data_size={}",
+                file_writer->path().native(), file_writer->bytes_appended(), 
stat.data_size);
     }
     return _rowset_writer->add_segment(segid, stat, flush_schema);
 }
@@ -152,12 +190,13 @@ Status LoadStreamWriter::close() {
             << "rowset builder is supposed be to initialized before 
close_wait() being called";
 
     if (_is_canceled) {
-        return Status::Error<ErrorCode::INTERNAL_ERROR>("flush segment 
failed");
+        return Status::InternalError("flush segment failed");
     }
 
     for (const auto& writer : _segment_file_writers) {
         if (!writer->is_closed()) {
-            return Status::Corruption("segment {} is not closed", 
writer->path().native());
+            return Status::Corruption("LoadStreamWriter close failed, segment 
{} is not closed",
+                                      writer->path().native());
         }
     }
 
diff --git a/be/src/runtime/load_stream_writer.h 
b/be/src/runtime/load_stream_writer.h
index e038ceeb89b..ab6530bf60d 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -61,7 +61,7 @@ public:
 
     Status init();
 
-    Status append_data(uint32_t segid, butil::IOBuf buf);
+    Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf);
 
     Status close_segment(uint32_t segid);
 
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index dd7a4a8ceca..535e941b3b6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -185,7 +185,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
 
 // APPEND_DATA
 Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
-                                   int64_t segment_id, std::span<const Slice> 
data,
+                                   int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
                                    bool segment_eos) {
     PStreamHeader header;
     header.set_src_id(_src_id);
@@ -195,6 +195,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
     header.set_tablet_id(tablet_id);
     header.set_segment_id(segment_id);
     header.set_segment_eos(segment_eos);
+    header.set_offset(offset);
     header.set_opcode(doris::PStreamHeader::APPEND_DATA);
     return _encode_and_send(header, data);
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 786de57d759..edbbbda1e64 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -162,7 +162,8 @@ public:
             // APPEND_DATA
             Status
             append_data(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
-                        int64_t segment_id, std::span<const Slice> data, bool 
segment_eos = false);
+                        int64_t segment_id, uint64_t offset, std::span<const 
Slice> data,
+                        bool segment_eos = false);
 
     // ADD_SEGMENT
     Status add_segment(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index c52b59f01e4..7e5bdd350f5 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -57,7 +57,7 @@ class StreamSinkFileWriterTest : public testing::Test {
 
         // APPEND_DATA
         virtual Status append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
-                                   int64_t segment_id, std::span<const Slice> 
data,
+                                   int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
                                    bool segment_eos = false) override {
             EXPECT_EQ(PARTITION_ID, partition_id);
             EXPECT_EQ(INDEX_ID, index_id);
@@ -65,10 +65,12 @@ class StreamSinkFileWriterTest : public testing::Test {
             EXPECT_EQ(SEGMENT_ID, segment_id);
             if (segment_eos) {
                 EXPECT_EQ(0, data.size());
+                EXPECT_EQ(DATA0.length() + DATA1.length(), offset);
             } else {
                 EXPECT_EQ(2, data.size());
                 EXPECT_EQ(DATA0, data[0].to_string());
                 EXPECT_EQ(DATA1, data[1].to_string());
+                EXPECT_EQ(0, offset);
             }
             g_num_request++;
             return Status::OK();
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index 247f9c6b6b5..b1ad0826177 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -513,8 +513,8 @@ public:
     }
 
     void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t 
sender_id,
-                          int64_t index_id, int64_t tablet_id, uint32_t segid, 
std::string& data,
-                          bool segment_eos) {
+                          int64_t index_id, int64_t tablet_id, uint32_t segid, 
uint64_t offset,
+                          std::string& data, bool segment_eos) {
         // append data
         butil::IOBuf append_buf;
         PStreamHeader header;
@@ -527,6 +527,7 @@ public:
         header.set_segment_eos(segment_eos);
         header.set_src_id(sender_id);
         header.set_partition_id(NORMAL_PARTITION_ID);
+        header.set_offset(offset);
         size_t hdr_len = header.ByteSizeLong();
         append_buf.append((char*)&hdr_len, sizeof(size_t));
         append_buf.append(header.SerializeAsString());
@@ -539,27 +540,27 @@ public:
 
     void write_normal(MockSinkClient& client) {
         write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
+                         NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true);
     }
 
     void write_abnormal_load(MockSinkClient& client) {
         write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+                         NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
     }
 
     void write_abnormal_index(MockSinkClient& client) {
         write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
ABNORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+                         NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
     }
 
     void write_abnormal_sender(MockSinkClient& client) {
         write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+                         NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
     }
 
     void write_abnormal_tablet(MockSinkClient& client) {
         write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
-                         ABNORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
+                         ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
     }
 
     void wait_for_ack(int32_t num) {
@@ -710,7 +711,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
-    EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
     // server will close stream on CLOSE_LOAD
     wait_for_close();
@@ -820,7 +821,7 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment0_zero_b
     PStreamHeader header;
     std::string data;
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     data, true);
+                     0, data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -861,9 +862,9 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment0) {
     PStreamHeader header;
     std::string data = "file1 hello world 123 !@#$%^&*()_+";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     data, false);
+                     0, data, false);
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     data, true);
+                     data.length(), data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -907,7 +908,7 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment_without
     PStreamHeader header;
     std::string data = "file1 hello world 123 !@#$%^&*()_+";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     data, false);
+                     0, data, false);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -948,9 +949,9 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment1) {
     PStreamHeader header;
     std::string data = "file1 hello world 123 !@#$%^&*()_+";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
-                     data, false);
+                     0, data, false);
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
-                     data, true);
+                     data.length(), data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -991,13 +992,13 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_two_segment) {
     PStreamHeader header;
     std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     data1, false);
+                     0, data1, false);
     std::string empty;
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
-                     empty, true);
+                     data1.length(), empty, true);
     std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
-                     data2, true);
+                     0, data2, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -1044,12 +1045,12 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_three_tablet) {
     PStreamHeader header;
     std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
-                     NORMAL_TABLET_ID + 0, 0, data1, true);
+                     NORMAL_TABLET_ID + 0, 0, 0, data1, true);
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
-                     NORMAL_TABLET_ID + 1, 0, data1, true);
+                     NORMAL_TABLET_ID + 1, 0, 0, data1, true);
     std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
     write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
-                     NORMAL_TABLET_ID + 2, 0, data2, true);
+                     NORMAL_TABLET_ID + 2, 0, 0, data2, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
     // CLOSE_LOAD
@@ -1113,7 +1114,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
             std::string data1 =
                     "sender_id=" + std::to_string(i) + ",segid=" + 
std::to_string(segid);
             write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, 
NORMAL_INDEX_ID,
-                             NORMAL_TABLET_ID, segid, data1, true);
+                             NORMAL_TABLET_ID, segid, 0, data1, true);
             segment_data[i * 3 + segid] = data1;
             LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1;
         }
@@ -1186,7 +1187,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_close_before_the_other_open) {
     for (int32_t segid = 2; segid >= 0; segid--) {
         int i = 0;
         write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, 
NORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], 
true);
+                         NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + 
segid], true);
     }
 
     EXPECT_EQ(g_response_stat.num, 0);
@@ -1205,7 +1206,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_close_before_the_other_open) {
     for (int32_t segid = 2; segid >= 0; segid--) {
         int i = 1;
         write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, 
NORMAL_INDEX_ID,
-                         NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], 
true);
+                         NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + 
segid], true);
     }
 
     close_load(clients[1], 1);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 3676d854a94..ec3714d618a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -791,6 +791,7 @@ message PStreamHeader {
     optional SegmentStatisticsPB segment_statistics = 9;
     repeated PTabletID tablets = 10;
     optional TabletSchemaPB flush_schema = 11;
+    optional uint64 offset = 12;
 }
 
 message PGetWalQueueSizeRequest{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to