This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c775f8e7bd [feature](move-memtable)[2/7] add protos for memtable on
sink node (#23348)
c775f8e7bd is described below
commit c775f8e7bd5405f4a9c83538b516f53c9630debb
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Aug 24 11:11:46 2023 +0800
[feature](move-memtable)[2/7] add protos for memtable on sink node (#23348)
Co-authored-by: zhengyu <[email protected]>
Co-authored-by: laihui <[email protected]>
---
be/src/olap/rowset/rowset_writer.h | 24 +++++++++++++++++
gensrc/proto/internal_service.proto | 52 +++++++++++++++++++++++++++++++++++++
gensrc/proto/olap_file.proto | 7 +++++
3 files changed, 83 insertions(+)
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 61cb20498a..21637a2379 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -24,6 +24,7 @@
#include <optional>
#include "common/factory_creator.h"
+#include "gen_cpp/olap_file.pb.h"
#include "gutil/macros.h"
#include "olap/column_mapping.h"
#include "olap/rowset/rowset.h"
@@ -38,7 +39,30 @@ struct SegmentStatistics {
int64_t data_size;
int64_t index_size;
KeyBoundsPB key_bounds;
+
+ SegmentStatistics() = default;
+
+ SegmentStatistics(SegmentStatisticsPB pb)
+ : row_num(pb.row_num()),
+ data_size(pb.data_size()),
+ index_size(pb.index_size()),
+ key_bounds(pb.key_bounds()) {}
+
+ void to_pb(SegmentStatisticsPB* segstat_pb) {
+ segstat_pb->set_row_num(row_num);
+ segstat_pb->set_data_size(data_size);
+ segstat_pb->set_index_size(index_size);
+ segstat_pb->mutable_key_bounds()->CopyFrom(key_bounds);
+ }
+
+ std::string to_string() {
+ std::stringstream ss;
+ ss << "row_num: " << row_num << ", data_size: " << data_size
+ << ", index_size: " << index_size << ", key_bounds: " <<
key_bounds.ShortDebugString();
+ return ss.str();
+ }
};
+using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>;
class RowsetWriter {
public:
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 830ed3c41a..bdfc0d823a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -61,6 +61,12 @@ message PTabletWithPartition {
required int64 tablet_id = 2;
}
+message PTabletID {
+ optional int64 partition_id = 1;
+ optional int64 index_id = 2;
+ optional int64 tablet_id = 3;
+}
+
message PTabletInfo {
required int64 tablet_id = 1;
required int32 schema_hash = 2;
@@ -692,6 +698,51 @@ message PGlobResponse {
repeated PFileInfo files = 2;
}
+message POpenStreamSinkRequest {
+ optional PUniqueId load_id = 1;
+ optional int64 txn_id = 2;
+ optional int64 src_id = 3;
+ optional POlapTableSchemaParam schema = 4;
+ repeated PTabletID tablets = 5;
+ optional bool enable_profile = 6 [default = false];
+}
+
+message PTabletSchemaWithIndex {
+ optional int64 index_id = 1;
+ optional TabletSchemaPB tablet_schema = 2;
+ optional bool enable_unique_key_merge_on_write = 3;
+}
+
+message POpenStreamSinkResponse {
+ optional PStatus status = 1;
+ repeated PTabletSchemaWithIndex tablet_schemas = 2;
+}
+
+message PWriteStreamSinkResponse {
+ optional PStatus status = 1;
+ repeated int64 success_tablet_ids = 2;
+ repeated int64 failed_tablet_ids = 3;
+ optional bytes load_stream_profile = 4;
+}
+
+message PStreamHeader {
+ enum Opcode {
+ APPEND_DATA = 1;
+ CLOSE_LOAD = 2;
+ ADD_SEGMENT = 3;
+ }
+ optional PUniqueId load_id = 1;
+ optional int64 partition_id = 2;
+ optional int64 index_id = 3;
+ optional int64 tablet_id = 4;
+ optional int32 segment_id = 5;
+ optional Opcode opcode = 6;
+ optional bool segment_eos = 7;
+ optional int64 src_id = 8;
+ optional SegmentStatisticsPB segment_statistics = 9;
+ repeated PTabletID tablets_to_commit = 10;
+}
+
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -703,6 +754,7 @@ service PBackendService {
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns
(PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns
(PTabletWriterOpenResult);
+ rpc open_stream_sink(POpenStreamSinkRequest) returns
(POpenStreamSinkResponse);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_add_block_by_http(PEmptyRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns
(PTabletWriterCancelResult);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 9a1a2686aa..fe3ac7915c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -115,6 +115,13 @@ message RowsetMetaPB {
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
}
+message SegmentStatisticsPB {
+ optional int64 row_num = 1;
+ optional int64 data_size = 2;
+ optional int64 index_size = 3;
+ optional KeyBoundsPB key_bounds = 4;
+}
+
// kv value for reclaiming remote rowset
message RemoteRowsetGcPB {
required string resource_id = 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]