chaoyli closed pull request #483: Add AlphaRowsetBuilder
URL: https://github.com/apache/incubator-doris/pull/483
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/olap/rowset/alpha_rowset_builder.cpp
b/be/src/olap/rowset/alpha_rowset_builder.cpp
index 147799af..cdb75702 100644
--- a/be/src/olap/rowset/alpha_rowset_builder.cpp
+++ b/be/src/olap/rowset/alpha_rowset_builder.cpp
@@ -16,23 +16,98 @@
// under the License.
#include "olap/rowset/alpha_rowset_builder.h"
+#include "olap/rowset/alpha_rowset_meta.h"
+#include "olap/rowset/alpha_rowset.h"
namespace doris {
-NewStatus AlphaRowsetBuilder::init(int64_t rowset_id, const std::string&
rowset_path_prefix, Schema* schema) {
- return NewStatus::OK();
+AlphaRowsetBuilder::AlphaRowsetBuilder() : _segment_group_id(0),
+ _cur_segment_group(nullptr),
+ _column_data_writer(nullptr),
+ _current_rowset_meta(nullptr) {
}
-NewStatus AlphaRowsetBuilder::add_row_block(const RowBlock& row_block) {
+NewStatus AlphaRowsetBuilder::init(const RowsetBuilderContext&
rowset_builder_context) {
+ _rowset_builder_context = rowset_builder_context;
+ _init();
+ _current_rowset_meta->set_rowset_id(_rowset_builder_context.rowset_id);
+ _current_rowset_meta->set_tablet_id(_rowset_builder_context.tablet_id);
+ _current_rowset_meta->set_txn_id(_rowset_builder_context.txn_id);
+
_current_rowset_meta->set_tablet_schema_hash(_rowset_builder_context.tablet_schema_hash);
+ _current_rowset_meta->set_rowset_type(_rowset_builder_context.rowset_type);
+ _current_rowset_meta->set_rowset_state(PREPARING);
+
_current_rowset_meta->set_rowset_path(_rowset_builder_context.rowset_path_prefix);
+ _current_rowset_meta->set_version(_rowset_builder_context.version);
+
_current_rowset_meta->set_version_hash(_rowset_builder_context.version_hash);
+ _current_rowset_meta->set_load_id(_rowset_builder_context.load_id);
return NewStatus::OK();
}
-NewStatus AlphaRowsetBuilder::generate_written_path(const std::string&
src_path, std::string* dest_path) {
+NewStatus AlphaRowsetBuilder::add_row(RowCursor* row) {
+ OLAPStatus status = _column_data_writer->attached_by(row);
+ if (status != OLAP_SUCCESS) {
+ std::string error_msg = "add row failed";
+ LOG(WARNING) << error_msg;
+ return NewStatus::Corruption(error_msg);
+ }
return NewStatus::OK();
}
-NewStatus AlphaRowsetBuilder::build(Rowset* rowset) {
+NewStatus AlphaRowsetBuilder::flush() {
+ _column_data_writer->finalize();
+ SAFE_DELETE(_column_data_writer);
+ _init();
return NewStatus::OK();
}
+std::shared_ptr<Rowset> AlphaRowsetBuilder::build() {
+ // TODO: set total_disk_size/data_disk_size/index_disk_size
+ for (auto& segment_group : _segment_groups) {
+ PendingSegmentGroupPB pending_segment_group_pb;
+
pending_segment_group_pb.set_pending_segment_group_id(segment_group->segment_group_id());
+
pending_segment_group_pb.set_num_segments(segment_group->num_segments());
+ PUniqueId* unique_id = pending_segment_group_pb.mutable_load_id();
+ unique_id->set_hi(_rowset_builder_context.load_id.hi());
+ unique_id->set_lo(_rowset_builder_context.load_id.lo());
+ pending_segment_group_pb.set_empty(segment_group->empty());
+ const std::vector<KeyRange>* column_statistics =
&(segment_group->get_column_statistics());
+ if (column_statistics != nullptr) {
+ for (size_t i = 0; i < column_statistics->size(); ++i) {
+ ColumnPruning* column_pruning =
pending_segment_group_pb.add_column_pruning();
+
column_pruning->set_min(column_statistics->at(i).first->to_string());
+
column_pruning->set_max(column_statistics->at(i).second->to_string());
+
column_pruning->set_null_flag(column_statistics->at(i).first->is_null());
+ }
+ }
+ AlphaRowsetMeta* alpha_rowset_meta =
(AlphaRowsetMeta*)_current_rowset_meta.get();
+ alpha_rowset_meta->add_pending_segment_group(pending_segment_group_pb);
+ }
+ Rowset* rowset = new(std::nothrow)
AlphaRowset(_rowset_builder_context.tablet_schema,
+ _rowset_builder_context.num_key_fields,
_rowset_builder_context.num_short_key_fields,
+ _rowset_builder_context.num_rows_per_row_block,
_rowset_builder_context.rowset_path_prefix,
+ _current_rowset_meta);
+ rowset->init();
+ return std::shared_ptr<Rowset>(rowset);
+}
+
+void AlphaRowsetBuilder::_init() {
+ _segment_group_id++;
+ _cur_segment_group = new SegmentGroup(_rowset_builder_context.tablet_id,
+ _rowset_builder_context.tablet_schema,
+ _rowset_builder_context.num_key_fields,
+ _rowset_builder_context.num_short_key_fields,
+ _rowset_builder_context.num_rows_per_row_block,
+ _rowset_builder_context.rowset_path_prefix,
+ false, _segment_group_id, 0, true,
+ _rowset_builder_context.partition_id,
_rowset_builder_context.txn_id);
+ DCHECK(_cur_segment_group != nullptr) << "failed to malloc SegmentGroup";
+ _cur_segment_group->acquire();
+ _cur_segment_group->set_load_id(_rowset_builder_context.load_id);
+ _segment_groups.push_back(_cur_segment_group);
+
+ _column_data_writer = ColumnDataWriter::create(_cur_segment_group, true,
+ _rowset_builder_context.compress_kind,
_rowset_builder_context.bloom_filter_fpp);
+ DCHECK(_column_data_writer != nullptr) << "memory error occur when
creating writer";
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/alpha_rowset_builder.h
b/be/src/olap/rowset/alpha_rowset_builder.h
index a89a3556..a8aa2293 100644
--- a/be/src/olap/rowset/alpha_rowset_builder.h
+++ b/be/src/olap/rowset/alpha_rowset_builder.h
@@ -19,24 +19,40 @@
#define DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_BUILDER_H
#include "olap/rowset/rowset_builder.h"
+#include "olap/rowset/segment_group.h"
+#include "olap/rowset/column_data_writer.h"
+#include "olap/field_info.h"
+
+#include <vector>
namespace doris {
class AlphaRowsetBuilder : public RowsetBuilder {
public:
- virtual NewStatus init(int64_t rowset_id, const std::string&
rowset_path_prefix, Schema* schema);
+ AlphaRowsetBuilder();
+
+ virtual NewStatus init(const RowsetBuilderContext& rowset_builder_context);
// add a row block to rowset
- virtual NewStatus add_row_block(const RowBlock& row_block);
+ virtual NewStatus add_row(RowCursor* row);
- // this is a temp api
- // it is used to get rewritten path for writing rowset data
- virtual NewStatus generate_written_path(const std::string& src_path,
std::string* dest_path);
+ virtual NewStatus flush();
// get a rowset
- virtual NewStatus build(Rowset* rowset);
+ virtual std::shared_ptr<Rowset> build();
+
+private:
+ void _init();
+
+private:
+ int32_t _segment_group_id;
+ SegmentGroup* _cur_segment_group;
+ ColumnDataWriter* _column_data_writer;
+ std::shared_ptr<RowsetMeta> _current_rowset_meta;
+ RowsetBuilderContext _rowset_builder_context;
+ std::vector<SegmentGroup*> _segment_groups;
};
-}
+} // namespace doris
#endif // DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_BUILDER_H
\ No newline at end of file
diff --git a/be/src/olap/rowset/alpha_rowset_meta.cpp
b/be/src/olap/rowset/alpha_rowset_meta.cpp
index dd266319..5d6c886a 100644
--- a/be/src/olap/rowset/alpha_rowset_meta.cpp
+++ b/be/src/olap/rowset/alpha_rowset_meta.cpp
@@ -17,6 +17,8 @@
#include "olap/rowset/alpha_rowset_meta.h"
+#include "common/logging.h"
+
namespace doris {
bool AlphaRowsetMeta::deserialize_extra_properties() {
@@ -36,6 +38,28 @@ void AlphaRowsetMeta::add_segment_group(const
SegmentGroupPB& segment_group) {
*new_segment_group = segment_group;
}
+void AlphaRowsetMeta::get_pending_segment_groups(
+ std::vector<PendingSegmentGroupPB>* pending_segment_groups) {
+ for (auto& pending_segment_group :
_extra_meta_pb.pending_segment_groups()) {
+ pending_segment_groups->push_back(pending_segment_group);
+ }
+}
+
+void AlphaRowsetMeta::add_pending_segment_group(const PendingSegmentGroupPB&
pending_segment_group) {
+ for (int i = 0; i < _extra_meta_pb.pending_segment_groups_size(); i++) {
+ const PendingSegmentGroupPB& present_segment_group =
_extra_meta_pb.pending_segment_groups(i);
+ if (present_segment_group.pending_segment_group_id() ==
+ pending_segment_group.pending_segment_group_id()) {
+ LOG(WARNING) << "pending segment_group already exists in meta."
+ << "rowset_id:" << get_rowset_id()
+ << ", pending_segment_group_id: " <<
pending_segment_group.pending_segment_group_id();
+ return;
+ }
+ }
+ PendingSegmentGroupPB* new_pending_segment_group =
_extra_meta_pb.add_pending_segment_groups();
+ *new_pending_segment_group = pending_segment_group;
+}
+
void AlphaRowsetMeta::_serialize_extra_meta_pb() {
std::string extra_properties;
_extra_meta_pb.SerializeToString(&extra_properties);
diff --git a/be/src/olap/rowset/alpha_rowset_meta.h
b/be/src/olap/rowset/alpha_rowset_meta.h
index f69fb993..5518ae60 100644
--- a/be/src/olap/rowset/alpha_rowset_meta.h
+++ b/be/src/olap/rowset/alpha_rowset_meta.h
@@ -34,6 +34,10 @@ class AlphaRowsetMeta : public RowsetMeta {
void add_segment_group(const SegmentGroupPB& segment_group);
+ void get_pending_segment_groups(std::vector<PendingSegmentGroupPB>*
pending_segment_groups);
+
+ void add_pending_segment_group(const PendingSegmentGroupPB&
pending_segment_group);
+
private:
void _serialize_extra_meta_pb();
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 33fe08bb..04281236 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -35,7 +35,7 @@ class Rowset {
public:
virtual ~Rowset() { }
- virtual NewStatus init(const RowsetMeta& rowset_meta) = 0;
+ virtual NewStatus init() = 0;
virtual std::unique_ptr<RowsetReader> create_reader() = 0;
diff --git a/be/src/olap/rowset/rowset_builder.h
b/be/src/olap/rowset/rowset_builder.h
index 19db9fba..4787abd1 100644
--- a/be/src/olap/rowset/rowset_builder.h
+++ b/be/src/olap/rowset/rowset_builder.h
@@ -22,28 +22,46 @@
#include "olap/new_status.h"
#include "olap/schema.h"
#include "olap/row_block.h"
+#include "gen_cpp/types.pb.h"
namespace doris {
class Rowset;
+struct RowsetBuilderContext {
+ int64_t tablet_id;
+ int tablet_schema_hash;
+ int64_t rowset_id;
+ RowsetTypePB rowset_type;
+ std::string rowset_path_prefix;
+ RowFields tablet_schema;
+ int64_t partition_id;
+ int64_t txn_id;
+ int num_key_fields;
+ int num_short_key_fields;
+ int num_rows_per_row_block;
+ Version version;
+ VersionHash version_hash;
+ PUniqueId load_id;
+ CompressKind compress_kind;
+ double bloom_filter_fpp;
+};
+
class RowsetBuilder {
public:
virtual ~RowsetBuilder() { }
- virtual NewStatus init(int64_t rowset_id, const std::string&
rowset_path_prefix, Schema* schema) = 0;
+ virtual NewStatus init(const RowsetBuilderContext& rowset_builder_context)
= 0;
- // add a row block to rowset
- virtual NewStatus add_row_block(RowBlock* row_block) = 0;
+ // add a row to rowset
+ virtual NewStatus add_row(RowCursor* row_block) = 0;
- // this is a temp api
- // it is used to get rewritten path for writing rowset data
- virtual NewStatus generate_written_path(const std::string& src_path,
std::string* dest_path) = 0;
+ virtual NewStatus flush() = 0;
// get a rowset
- virtual NewStatus build(Rowset* rowset) = 0;
+ virtual std::shared_ptr<Rowset> build() = 0;
};
-}
+} // namespace doris
#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_BUILDER_H
\ No newline at end of file
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 8238f46c..637de695 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -77,6 +77,14 @@ class RowsetMeta {
_rowset_meta_pb.set_tablet_id(tablet_id);
}
+ virtual int64_t get_txn_id() {
+ return _rowset_meta_pb.txn_id();
+ }
+
+ virtual void set_txn_id(int64_t txn_id) {
+ _rowset_meta_pb.set_txn_id(txn_id);
+ }
+
virtual int32_t get_tablet_schema_hash() {
return _rowset_meta_pb.tablet_schema_hash();
}
@@ -196,12 +204,30 @@ class RowsetMeta {
*new_delete_condition = delete_predicate;
}
- virtual int64_t get_txn_id() {
- return _rowset_meta_pb.txn_id();
+ virtual bool get_empty() {
+ return _rowset_meta_pb.empty();
}
- virtual void set_txn_id(int64_t txn_id) {
- _rowset_meta_pb.set_txn_id(txn_id);
+ virtual void set_empty(bool empty) {
+ _rowset_meta_pb.set_empty(empty);
+ }
+
+ virtual std::string get_rowset_path() {
+ return _rowset_meta_pb.rowset_path();
+ }
+
+ virtual void set_rowset_path(std::string rowset_path) {
+ _rowset_meta_pb.set_rowset_path(rowset_path);
+ }
+
+ virtual PUniqueId get_load_id() {
+ return _rowset_meta_pb.load_id();
+ }
+
+ virtual void set_load_id(PUniqueId load_id) {
+ PUniqueId* new_load_id = _rowset_meta_pb.mutable_load_id();
+ new_load_id->set_hi(load_id.hi());
+ new_load_id->set_lo(load_id.lo());
}
std::string get_extra_properties() {
@@ -230,4 +256,4 @@ class RowsetMeta {
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_H
+#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_H
\ No newline at end of file
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index fa56c20c..4900434b 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -88,12 +88,14 @@ message RowsetMetaPB {
optional DeletePredicatePB delete_predicate = 15;
optional bool empty = 16;
optional string rowset_path = 17;
+ required PUniqueId load_id = 18;
// spare field id for future use
optional bytes extra_properties = 50;
}
message AlphaRowsetExtraMetaPB {
repeated SegmentGroupPB segment_groups = 1;
+ repeated PendingSegmentGroupPB pending_segment_groups = 2;
}
message SegmentGroupPB {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]