chaoyli closed pull request #479: Fix tablet compilation error
URL: https://github.com/apache/incubator-doris/pull/479
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index f1c55fe3..7f2a2b2b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -2303,7 +2303,7 @@ AgentStatus TaskWorkerPool::_move_dir(
}
std::string dest_tablet_dir = tablet->construct_dir_path();
- std::string store_path = tablet->store()->path();
+ std::string store_path = tablet->data_dir()->path();
SnapshotLoader* loader = _env->snapshot_loader();
Status status = loader->move(src, dest_tablet_dir, store_path, job_id,
overwrite);
diff --git a/be/src/http/action/meta_action.cpp
b/be/src/http/action/meta_action.cpp
index 7cc3bee3..e5dbedcf 100644
--- a/be/src/http/action/meta_action.cpp
+++ b/be/src/http/action/meta_action.cpp
@@ -54,7 +54,7 @@ Status MetaAction::_handle_header(HttpRequest *req,
std::string* json_header) {
LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema
hash:" << schema_hash;
return Status("no tablet exist");
}
- OLAPStatus s = TabletMetaManager::get_json_header(tablet->store(),
tablet_id, schema_hash, json_header);
+ OLAPStatus s = TabletMetaManager::get_json_header(tablet->data_dir(),
tablet_id, schema_hash, json_header);
if (s == OLAP_ERR_META_KEY_NOT_FOUND) {
return Status("no header exist");
} else if (s != OLAP_SUCCESS) {
diff --git a/be/src/http/action/restore_tablet_action.cpp
b/be/src/http/action/restore_tablet_action.cpp
index 1030a596..20d5d4ba 100644
--- a/be/src/http/action/restore_tablet_action.cpp
+++ b/be/src/http/action/restore_tablet_action.cpp
@@ -207,7 +207,7 @@ Status RestoreTabletAction::_restore(const std::string&
key, int64_t tablet_id,
return Status("create link path failed");
}
}
- std::string restore_shard_path =
store->get_absolute_shard_path(std::to_string(header.shard()));
+ std::string restore_shard_path =
store->get_absolute_shard_path(std::to_string(header.shard_id()));
Status status = _reload_tablet(key, restore_shard_path, tablet_id,
schema_hash);
return status;
}
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 4f4f774f..7eca9ffc 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -322,10 +322,14 @@ OLAPStatus
BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
vector<ColumnData*>*
base_data_sources,
uint64_t* row_count) {
// 1. 生成新base文件对应的olap index
+ /*
SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_tablet.get(),
_new_base_version,
new_base_version_hash,
false, 0, 0);
+ */
+
+ SegmentGroup* new_base = nullptr;
if (new_base == NULL) {
OLAP_LOG_WARNING("fail to new SegmentGroup.");
return OLAP_ERR_MALLOC_ERROR;
@@ -440,7 +444,7 @@ OLAPStatus BaseCompaction::_update_header(uint64_t
row_count, vector<SegmentGrou
// Base Compaction完成之后,需要删除header中版本号小于等于新base文件版本号的删除条件
DeleteConditionHandler cond_handler;
- cond_handler.delete_cond(_tablet, _new_base_version.second, true);
+ cond_handler.delete_cond(nullptr, _new_base_version.second, true);
// 如果保存Header失败, 所有新增的信息会在下次启动时丢失, 属于严重错误
// 暂时没办法做很好的处理,报FATAL
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index f771e2dc..f29a55c0 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -134,10 +134,13 @@ OLAPStatus CumulativeCompaction::run() {
do {
// 3. 生成新cumulative文件对应的olap index
+ /*
_new_segment_group = new (nothrow) SegmentGroup(_tablet.get(),
_cumulative_version,
_cumulative_version_hash,
false, 0, 0);
+ */
+ _new_segment_group = nullptr;
if (_new_segment_group == NULL) {
OLAP_LOG_WARNING("failed to malloc new cumulative olap index. "
"[tablet=%s; cumulative_version=%d-%d]",
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 7ef19358..904570df 100755
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -450,11 +450,11 @@ std::string DataDir::get_absolute_shard_path(const
std::string& shard_string) {
std::string DataDir::get_absolute_tablet_path(TabletMeta* header, bool
with_schema_hash) {
if (with_schema_hash) {
- return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
+ return _path + DATA_PREFIX + "/" + std::to_string(header->shard_id())
+ "/" + std::to_string(header->tablet_id()) + "/" +
std::to_string(header->schema_hash());
} else {
- return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
+ return _path + DATA_PREFIX + "/" + std::to_string(header->shard_id())
+ "/" + std::to_string(header->tablet_id());
}
}
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 390b24ab..d930fbb1 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -63,7 +63,7 @@ string DeleteConditionHandler::construct_sub_conditions(const
TCondition& condit
// 删除指定版本号的删除条件;需要注意的是,如果table上没有任何删除条件,或者
// 指定版本号的删除条件不存在,也会返回OLAP_SUCCESS。
-OLAPStatus DeleteConditionHandler::delete_cond(del_cond_array*
delete_condition,
+OLAPStatus DeleteConditionHandler::delete_cond(del_cond_array*
delete_conditions,
const int32_t version,
bool delete_smaller_version_conditions) {
if (version < 0) {
@@ -81,7 +81,7 @@ OLAPStatus
DeleteConditionHandler::delete_cond(del_cond_array* delete_condition,
// 1. 如果删除条件的版本号等于形参指定的版本号,则删除该版本的文件;
// 2. 如果还指定了delete_smaller_version_conditions为true,则同时删除
// 版本号小于指定版本号的删除条件;否则不删除。
- DeleteConditionMessage temp = delete_conditions->Get(index);
+ DeletePredicatePB temp = delete_conditions->Get(index);
if (temp.version() == version ||
(temp.version() < version &&
delete_smaller_version_conditions)) {
@@ -114,7 +114,7 @@ OLAPStatus DeleteConditionHandler::log_conds(std::string
tablet_full_name,
LOG(INFO) << "display all delete condition. tablet=" << tablet_full_name;
for (int index = 0; index != delete_conditions.size(); ++index) {
- DeleteConditionMessage temp = delete_conditions.Get(index);
+ DeletePredicatePB temp = delete_conditions.Get(index);
string del_cond_str;
const RepeatedPtrField<string>& sub_conditions = temp.sub_conditions();
@@ -210,9 +210,9 @@ OLAPStatus
DeleteConditionHandler::_check_version_valid(std::vector<Version>* al
const int32_t filter_version) {
// 找到当前最大的delta文件版本号
int max_delta_version = -1;
- vector<Version>::const_iterator version_iter = all_file_versions.begin();
+ vector<Version>::const_iterator version_iter = all_file_versions->begin();
- for (; version_iter != all_file_versions.end(); ++version_iter) {
+ for (; version_iter != all_file_versions->end(); ++version_iter) {
if (version_iter->second > max_delta_version) {
max_delta_version = version_iter->second;
}
@@ -235,7 +235,7 @@ int
DeleteConditionHandler::_check_whether_condition_exist(const del_cond_array&
int index = 0;
while (index != delete_conditions.size()) {
- DeleteConditionMessage temp = delete_conditions.Get(index);
+ DeletePredicatePB temp = delete_conditions.Get(index);
if (temp.version() == cond_version) {
return index;
diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h
index b7d83dc9..33abc1fb 100644
--- a/be/src/olap/delete_handler.h
+++ b/be/src/olap/delete_handler.h
@@ -47,7 +47,7 @@ namespace doris {
// * 在调用log_conds()的时候,只需要加读锁
class DeleteConditionHandler {
public:
- typedef google::protobuf::RepeatedPtrField<DeleteConditionMessage>
del_cond_array;
+ typedef google::protobuf::RepeatedPtrField<DeletePredicatePB>
del_cond_array;
DeleteConditionHandler() {}
~DeleteConditionHandler() {}
@@ -130,7 +130,7 @@ struct DeleteConditions {
class DeleteHandler {
public:
typedef std::vector<DeleteConditions>::size_type cond_num_t;
- typedef google::protobuf::RepeatedPtrField<DeleteConditionMessage>
del_cond_array;
+ typedef google::protobuf::RepeatedPtrField<DeletePredicatePB>
del_cond_array;
DeleteHandler() : _is_inited(false) {}
~DeleteHandler() {}
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 74c892e3..ac956682 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -103,8 +103,9 @@ OLAPStatus DeltaWriter::init() {
}
++_segment_group_id;
- _cur_segment_group = new SegmentGroup(_tablet.get(), false,
_segment_group_id, 0, true,
- _req.partition_id, _req.transaction_id);
+ //_cur_segment_group = new SegmentGroup(_tablet.get(), false,
_segment_group_id, 0, true,
+ //_req.partition_id, _req.transaction_id);
+ _cur_segment_group = nullptr;
DCHECK(_cur_segment_group != nullptr) << "failed to malloc SegmentGroup";
_cur_segment_group->acquire();
_cur_segment_group->set_load_id(_req.load_id);
@@ -113,7 +114,7 @@ OLAPStatus DeltaWriter::init() {
// New Writer to write data into SegmentGroup
VLOG(3) << "init writer. tablet=" << _tablet->full_name() << ", "
<< "block_row_size=" << _tablet->num_rows_per_row_block();
- _writer = ColumnDataWriter::create(_tablet, _cur_segment_group, true);
+ _writer = ColumnDataWriter::create(_cur_segment_group, true,
_tablet->compress_kind(), _tablet->bloom_filter_fpp());
DCHECK(_writer != nullptr) << "memory error occur when creating writer";
const std::vector<SlotDescriptor*>& slots = _req.tuple_desc->slots();
@@ -145,15 +146,16 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
RETURN_NOT_OK(_mem_table->flush(_writer));
++_segment_group_id;
- _cur_segment_group = new SegmentGroup(_tablet.get(), false,
_segment_group_id, 0, true,
- _req.partition_id, _req.transaction_id);
+ //_cur_segment_group = new SegmentGroup(_tablet.get(), false,
_segment_group_id, 0, true,
+ // _req.partition_id, _req.transaction_id);
+ _cur_segment_group = nullptr;
DCHECK(_cur_segment_group != nullptr) << "failed to malloc
SegmentGroup";
_cur_segment_group->acquire();
_cur_segment_group->set_load_id(_req.load_id);
_segment_group_vec.push_back(_cur_segment_group);
SAFE_DELETE(_writer);
- _writer = ColumnDataWriter::create(_tablet, _cur_segment_group, true);
+ _writer = ColumnDataWriter::create(_cur_segment_group, true,
_tablet->compress_kind(), _tablet->bloom_filter_fpp());
DCHECK(_writer != nullptr) << "memory error occur when creating
writer";
SAFE_DELETE(_mem_table);
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 2afd25ce..2f6f99d6 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -22,7 +22,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/schema_change.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "gen_cpp/internal_service.pb.h"
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 787ce63a..0ebf153a 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -18,7 +18,7 @@
#include "olap/memtable.h"
#include "olap/hll.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "olap/row_cursor.h"
#include "util/runtime_profile.h"
#include "util/debug_util.h"
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index d28e61ef..0b57f11f 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -26,7 +26,7 @@
#include "olap/tablet.h"
#include "olap/reader.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
using std::list;
using std::string;
@@ -62,7 +62,8 @@ OLAPStatus Merger::merge(const vector<ColumnData*>&
olap_data_arr,
}
// create and initiate writer for generating new index and data files.
- unique_ptr<ColumnDataWriter> writer(ColumnDataWriter::create(_tablet,
_segment_group, false));
+ unique_ptr<ColumnDataWriter>
writer(ColumnDataWriter::create(_segment_group, false,
_tablet->compress_kind(), _tablet->bloom_filter_fpp()));
+ DCHECK(_writer != nullptr) << "memory error occur when creating writer";
if (NULL == writer) {
OLAP_LOG_WARNING("fail to allocate writer.");
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 123eb632..f85e1612 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -264,4 +264,4 @@ typedef std::map<ColumnId, ColumnId> UniqueIdToColumnIdMap;
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_OLAP_COMMON_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_OLAP_COMMON_H
diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h
index e8f48613..67d1bfcf 100644
--- a/be/src/olap/olap_cond.h
+++ b/be/src/olap/olap_cond.h
@@ -24,6 +24,7 @@
#include <unordered_set>
#include <vector>
+#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/column_data_file.pb.h"
#include "olap/bloom_filter.hpp"
#include "olap/stream_index_common.h"
diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp
index f10c45c3..a45c3ec1 100644
--- a/be/src/olap/olap_snapshot.cpp
+++ b/be/src/olap/olap_snapshot.cpp
@@ -358,7 +358,7 @@ OLAPStatus StorageEngine::_create_snapshot_files(
}
// load tablet header, in order to remove versions that not in
shortest version path
- DataDir* store = ref_tablet->store();
+ DataDir* store = ref_tablet->data_dir();
new_tablet_meta = new(nothrow) TabletMeta();
if (new_tablet_meta == NULL) {
OLAP_LOG_WARNING("fail to malloc TabletMeta.");
@@ -661,7 +661,7 @@ OLAPStatus StorageEngine::storage_medium_migrate(
return OLAP_SUCCESS;
}
- TStorageMedium::type src_storage_medium =
tablet->store()->storage_medium();
+ TStorageMedium::type src_storage_medium =
tablet->data_dir()->storage_medium();
if (src_storage_medium == storage_medium) {
LOG(INFO) << "tablet is already on specified storage medium. "
<< "storage_medium=" << storage_medium;
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 4f254302..6c03c456 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -29,7 +29,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/segment_group.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
namespace doris {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index f1f205d6..53b1fe6d 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -36,6 +36,7 @@
#include "util/runtime_profile.h"
#include "olap/column_predicate.h"
+#include "olap/tablet.h"
namespace doris {
@@ -44,6 +45,7 @@ class RowCursor;
class RowBlock;
class CollectIterator;
class RuntimeState;
+class ColumnData;
// Params for Reader,
// mainly include tablet, data version and fetch range.
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index a53368d4..33fe08bb 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -46,4 +46,4 @@ class Rowset {
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 2685915b..8238f46c 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -187,12 +187,12 @@ class RowsetMeta {
*new_column_statistic = column_statistic;
}
- virtual const DeleteConditionMessage& get_delete_predicate() {
+ virtual const DeletePredicatePB& get_delete_predicate() {
return _rowset_meta_pb.delete_predicate();
}
- virtual void set_delete_predicate(DeleteConditionMessage&
delete_predicate) {
- DeleteConditionMessage* new_delete_condition =
_rowset_meta_pb.mutable_delete_predicate();
+ virtual void set_delete_predicate(DeletePredicatePB& delete_predicate) {
+ DeletePredicatePB* new_delete_condition =
_rowset_meta_pb.mutable_delete_predicate();
*new_delete_condition = delete_predicate;
}
@@ -230,4 +230,4 @@ class RowsetMeta {
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_H
diff --git a/be/src/olap/rowset/segment_group.h
b/be/src/olap/rowset/segment_group.h
index eb31a46a..989e1dab 100644
--- a/be/src/olap/rowset/segment_group.h
+++ b/be/src/olap/rowset/segment_group.h
@@ -294,9 +294,8 @@ class SegmentGroup {
std::vector<std::pair<WrapperField*, WrapperField*>> _column_statistics;
std::unordered_map<uint32_t, FileHeader<ColumnDataHeaderMessage> >
_seg_pb_map;
- DISALLOW_COPY_AND_ASSIGN(SegmentGroup);
};
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_SEGMENT_GROUP_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_ROWSET_SEGMENT_GROUP_H
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index c5d39073..06bf704c 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -44,7 +44,7 @@
#include "olap/schema_change.h"
#include "olap/data_dir.h"
#include "olap/utils.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "util/time.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 84a7688a..dcd6e98c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -39,7 +39,9 @@
#include "util/defer_op.h"
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
+#include "olap/rowset/column_data.h"
+#include "olap/rowset/segment_group.h"
using std::pair;
using std::map;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c7859994..0807e90c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -42,14 +42,171 @@ class Tablet;
class RowBlockPosition;
class DataDir;
class RowsetReader;
+class ColumnData;
+class SegmentGroup;
using TabletSharedPtr = std::shared_ptr<Tablet>;
+struct SchemaChangeStatus {
+ SchemaChangeStatus() : status(ALTER_TABLE_WAITING), schema_hash(0),
version(-1) {}
+
+ AlterTableStatus status;
+ SchemaHash schema_hash;
+ int32_t version;
+};
+
class Tablet : public std::enable_shared_from_this<Tablet> {
public:
+ static TabletSharedPtr create_from_header_file(
+ int64_t tablet_id,
+ int64_t schema_hash,
+ const std::string& header_file,
+ DataDir* data_dir = nullptr);
+ static TabletSharedPtr create_from_header(
+ TabletMeta* meta,
+ DataDir* data_dir = nullptr);
+
Tablet(TabletMeta* tablet_meta, DataDir* data_dir);
~Tablet();
+ OLAPStatus load();
+ bool is_loaded();
+ OLAPStatus load_indices();
+ OLAPStatus save_tablet_meta();
+
+ OLAPStatus select_versions_to_span(const Version& version,
+ std::vector<Version>* span_versions)
const;
+ void acquire_data_sources(const Version& version,
std::vector<ColumnData*>* sources) const;
+ void acquire_data_sources_by_versions(const std::vector<Version>&
version_list,
+ std::vector<ColumnData*>* sources)
const;
+ OLAPStatus release_data_sources(std::vector<ColumnData*>* data_sources)
const;
+ OLAPStatus unregister_data_source(const Version& version,
std::vector<SegmentGroup*>* segment_group_vec);
+ OLAPStatus add_pending_version(int64_t partition_id, int64_t
transaction_id,
+ const std::vector<std::string>*
delete_conditions);
+ OLAPStatus add_pending_segment_group(SegmentGroup* segment_group);
+ OLAPStatus add_pending_data(SegmentGroup* segment_group, const
std::vector<TCondition>* delete_conditions);
+ bool has_pending_data(int64_t transaction_id);
+ void delete_pending_data(int64_t transaction_id);
+ void get_expire_pending_data(std::vector<int64_t>* transaction_ids);
+ void delete_expire_incremental_data();
+ void load_pending_data();
+ OLAPStatus publish_version(int64_t transaction_id, Version version,
VersionHash version_hash);
+ const PDelta* get_incremental_delta(Version version) const;
+ void get_missing_versions_with_meta_locked(
+ int64_t until_version, std::vector<Version>* missing_versions)
const;
+ OLAPStatus is_push_for_delete(int64_t transaction_id, bool*
is_push_for_delete) const;
+ OLAPStatus clone_data(const TabletMeta& clone_header,
+ const std::vector<const PDelta*>& clone_deltas,
+ const std::vector<Version>& versions_to_delete);
+ OLAPStatus replace_data_sources(const std::vector<Version>* old_versions,
+ const std::vector<SegmentGroup*>*
new_data_sources,
+ std::vector<SegmentGroup*>* old_data_sources);
+ OLAPStatus compute_all_versions_hash(const std::vector<Version>& versions,
+ VersionHash* version_hash) const;
+ OLAPStatus merge_tablet_meta(const TabletMeta& hdr, int to_version);
+ bool has_version(const Version& version) const;
+ void list_versions(std::vector<Version>* versions) const;
+ void list_version_entities(std::vector<VersionEntity>* version_entities)
const;
+ void mark_dropped();
+ bool is_dropped();
+ void delete_all_files();
+ void obtain_header_rdlock() { _meta_lock.rdlock(); }
+ void obtain_header_wrlock() { _meta_lock.wrlock(); }
+ void release_header_lock() { _meta_lock.unlock(); }
+ RWMutex* get_header_lock_ptr() { return &_meta_lock; }
+ void obtain_push_lock() { _ingest_lock.lock(); }
+ void release_push_lock() { _ingest_lock.unlock(); }
+ Mutex* get_push_lock() { return &_ingest_lock; }
+ bool try_base_compaction_lock() { return _base_lock.trylock() ==
OLAP_SUCCESS; }
+ void obtain_base_compaction_lock() { _base_lock.lock(); }
+ void release_base_compaction_lock() { _base_lock.unlock(); }
+ bool try_cumulative_lock() { return (OLAP_SUCCESS ==
_cumulative_lock.trylock()); }
+ void obtain_cumulative_lock() { _cumulative_lock.lock(); }
+ void release_cumulative_lock() { _cumulative_lock.unlock(); }
+ std::string construct_index_file_path(const Version& version,
+ VersionHash version_hash,
+ int32_t segment_group_id, int32_t
segment) const;
+ std::string construct_data_file_path(const Version& version,
+ VersionHash version_hash,
+ int32_t segment_group_id, int32_t
segment) const;
+ static std::string construct_file_path(const std::string& tablet_path,
+ const Version& version,
+ VersionHash version_hash,
+ int32_t segment_group_id, int32_t
segment,
+ const std::string& suffix);
+ std::string construct_pending_data_dir_path() const;
+ std::string construct_pending_index_file_path(
+ TTransactionId transaction_id, int32_t segment_group_id, int32_t
segment) const;
+ std::string construct_pending_data_file_path(
+ TTransactionId transaction_id, int32_t segment_group_id, int32_t
segment) const;
+ std::string construct_incremental_delta_dir_path() const;
+ std::string construct_incremental_index_file_path(
+ Version version, VersionHash version_hash, int32_t segment_group_id,
int32_t segment) const;
+ std::string construct_incremental_data_file_path(
+ Version version, VersionHash version_hash, int32_t segment_group_id,
int32_t segment) const;
+ std::string construct_dir_path() const;
+ std::vector<FieldInfo>& tablet_schema();
+ int file_delta_size() const;
+ const PDelta& delta(int index) const;
+ const PDelta* get_delta(int index) const;
+ const PDelta* lastest_delta() const;
+ const PDelta* lastest_version() const;
+ const PDelta* least_complete_version(
+ const std::vector<Version>& missing_versions) const;
+ const PDelta* base_version() const;
+ const uint32_t get_cumulative_compaction_score() const;
+ const uint32_t get_base_compaction_score() const;
+ const OLAPStatus delete_version(const Version& version);
+ DataFileType data_file_type() const;
+ int delete_data_conditions_size() const;
+ DeletePredicatePB* add_delete_data_conditions();
+ const google::protobuf::RepeatedPtrField<DeletePredicatePB>&
delete_data_conditions();
+ KeysType keys_type() const;
+ bool is_delete_data_version(Version version);
+ bool is_load_delete_version(Version version);
+ const int64_t creation_time() const;
+ void set_creation_time(int64_t time_seconds);
+ const int32_t cumulative_layer_point() const;
+ void set_cumulative_layer_point(const int32_t new_point);
+ bool is_schema_changing();
+ bool get_schema_change_request(TTabletId* tablet_id,
+ SchemaHash* schema_hash,
+ std::vector<Version>* versions_to_changed,
+ AlterTabletType* alter_table_type) const;
+ void set_schema_change_request(int64_t tablet_id,
+ int64_t schema_hash,
+ const std::vector<Version>&
versions_to_changed,
+ const AlterTabletType alter_table_type);
+ bool remove_last_schema_change_version(TabletSharedPtr new_olap_table);
+ void clear_schema_change_request();
+ SchemaChangeStatus schema_change_status();
+ void set_schema_change_status(AlterTableStatus status,
+ SchemaHash schema_hash,
+ int32_t version);
+ bool equal(int64_t tablet_id, int64_t schema_hash);
+ bool is_used();
+ std::string storage_root_path_name();
+ std::string tablet_path();
+ std::string get_field_name_by_index(uint32_t index);
+ FieldType get_field_type_by_index(uint32_t index);
+ FieldAggregationMethod get_aggregation_by_index(uint32_t index);
+ OLAPStatus test_version(const Version& version);
+ VersionEntity get_version_entity_by_version(const Version& version);
+ size_t get_version_index_size(const Version& version);
+ size_t get_version_data_size(const Version& version);
+ OLAPStatus recover_tablet_until_specfic_version(const int64_t&
until_version,
+ const int64_t&
version_hash);
+
+
+
+ OLAPStatus capture_consistent_rowsets(const Version& spec_version,
+
vector<std::shared_ptr<RowsetReader>>* rs_readers);
+ void acquire_rs_reader_by_version(const vector<Version>& version_vec,
+ vector<std::shared_ptr<RowsetReader>>*
rs_readers) const;
+ OLAPStatus release_rs_readers(vector<std::shared_ptr<RowsetReader>>*
rs_readers) const;
+ OLAPStatus modify_rowsets(vector<RowsetSharedPtr>& to_add,
+ vector<RowsetSharedPtr>& to_delete);
+
const int64_t table_id() const;
const std::string table_name() const;
const int64_t partition_id() const;
@@ -57,7 +214,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
const int64_t schema_hash() const;
const int16_t shard_id();
DataDir* data_dir() const;
- KeysType keys_type() const;
double bloom_filter_fpp() const;
bool equal(TTabletId tablet_id, TSchemaHash schema_hash);
@@ -73,12 +229,9 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
size_t get_field_index(const std::string& field_name) const;
size_t get_row_size() const;
+ size_t get_index_size() const;
size_t all_rowsets_size() const;
- size_t num_rows() const;
- FieldType get_field_type_by_index(size_t index);
- FieldAggregationMethod get_aggregation_by_index(size_t index);
- OLAPStatus test_version(const Version& version);
- VersionEntity get_version_entity_by_version(const Version& version);
+ size_t get_num_rows() const;
size_t get_rowset_size(const Version& version);
AlterTabletState alter_tablet_state();
@@ -87,8 +240,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
const RowsetSharedPtr get_rowset(int index) const;
const RowsetSharedPtr lastest_rowset() const;
OLAPStatus all_rowsets(vector<RowsetSharedPtr> rowsets);
- OLAPStatus modify_rowsets(vector<RowsetSharedPtr>& to_add,
- vector<RowsetSharedPtr>& to_delete);
OLAPStatus add_inc_rowset(const Rowset& rowset);
RowsetSharedPtr get_inc_rowset(const Version& version) const;
@@ -97,27 +248,14 @@ class Tablet : public std::enable_shared_from_this<Tablet>
{
OLAPStatus is_deletion_rowset(const Version& version) const;
OLAPStatus create_snapshot();
- OLAPStatus capture_consistent_rowsets(const Version& spec_version,
vector<std::shared_ptr<RowsetReader>>* rs_readers);
- void acquire_rs_reader_by_version(const vector<Version>& version_vec,
- vector<std::shared_ptr<RowsetReader>>*
rs_readers) const;
- OLAPStatus release_rs_readers(vector<std::shared_ptr<RowsetReader>>*
rs_readers) const;
RWMutex* meta_lock();
Mutex* ingest_lock();
Mutex* base_lock();
Mutex* cumulative_lock();
- bool has_version(const Version& version) const;
- void list_versions(vector<Version>* versions) const;
void calc_missed_versions(int64_t spec_version, vector<Version>*
missed_versions) const;
- // versions in [0, m_cumulative_layer_point) is base and cumulative
versions;
- // versions in [m_cumulative_layer_point, newest_delta_version] is delta
versons;
- // 在使用之前对header加锁
- const int32_t cumulative_layer_point() const;
- void set_cumulative_layer_point(const int32_t new_point);
- const size_t get_cumulative_compaction_score() const;
- const size_t get_base_compaction_score() const;
size_t deletion_rowset_size();
bool can_do_compaction();
@@ -141,11 +279,6 @@ class Tablet : public std::enable_shared_from_this<Tablet>
{
uint32_t segment_size() const;
void set_io_error();
- bool is_used();
- bool is_schema_changing();
- OLAPStatus recover_tablet_until_specfic_version(const int64_t&
spec_version);
-
- size_t get_version_data_size(const Version& version);
RowsetSharedPtr rowset_with_largest_size();
public:
DataDir* _data_dir;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 533e1bf4..0dfb2542 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -60,19 +60,19 @@ OLAPStatus TabletMeta::deserialize_unlock(const string&
meta_binary) {
// generate TabletState
switch (_tablet_meta_pb.tablet_state()) {
case PB_NOTREADY:
- _tablet_state = TabletState::NOTREADY;
+ _tablet_state = TabletState::TABLET_NOTREADY;
break;
case PB_RUNNING:
- _tablet_state = TabletState::RUNNING;
+ _tablet_state = TabletState::TABLET_RUNNING;
break;
case PB_TOMBSTONED:
- _tablet_state = TabletState::TOMBSTONED;
+ _tablet_state = TabletState::TABLET_TOMBSTONED;
break;
case PB_STOPPED:
- _tablet_state = TabletState::STOPPED;
+ _tablet_state = TabletState::TABLET_STOPPED;
break;
case PB_SHUTDOWN:
- _tablet_state = TabletState::SHUTDOWN;
+ _tablet_state = TabletState::TABLET_SHUTDOWN;
break;
default:
LOG(WARNING) << "tablet has no state. tablet=" << _tablet_id
@@ -115,12 +115,12 @@ OLAPStatus TabletMeta::to_tablet_pb_unlock(TabletMetaPB*
tablet_meta_pb) {
tablet_meta_pb->set_tablet_name(_tablet_name);
for (auto rs : _rs_metas) {
- rs.to_rowset_pb(pb.add_rs_meta());
+ rs.to_rowset_pb(tablet_meta_pb-.add_rs_meta());
}
for (auto rs : _inc_rs_metas) {
- rs.to_rowset_pb(pb.add_inc_rc_meta());
+ rs.to_rowset_pb(tablet_meta_pb.add_inc_rc_meta());
}
- _schema.to_schema_pb(pb.mutable_schema());
+ _schema.to_schema_pb(tablet_meta_pb.mutable_schema());
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 297c1484..bf328bff 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -34,18 +34,18 @@ using std::vector;
namespace doris {
enum TabletState {
- NOTREADY,
- RUNNING,
- TOMBSTONED,
- STOPPED,
- SHUTDOWN
+ TABLET_NOTREADY,
+ TABLET_RUNNING,
+ TABLET_TOMBSTONED,
+ TABLET_STOPPED,
+ TABLET_SHUTDOWN
};
enum AlterTabletState {
- NONE,
- ALTERING,
- FINISHED,
- FAILED
+ ALTER_NONE,
+ ALTER_ALTERING,
+ ALTER_FINISHED,
+ ALTER_FAILED
};
class Rowset;
@@ -65,22 +65,27 @@ class AlterTabletTask {
inline int64_t related_tablet_id() { return _related_tablet_id; }
inline int64_t related_schema_hash() { return _related_schema_hash; }
- vector<RowsetMeta>& rowsets_to_alter() { return _rowsets_to_alter; }
+ vector<RowsetMetaSharedPtr>& rowsets_to_alter() { return
_rowsets_to_alter; }
const AlterTabletState& alter_state() const { return _alter_state; }
const AlterTabletType& alter_type() const { return _alter_type; }
private:
int64_t _related_tablet_id;
int64_t _related_schema_hash;
- vector<RowsetMeta> _rowsets_to_alter;
+ vector<RowsetMetaSharedPtr> _rowsets_to_alter;
AlterTabletState _alter_state;
AlterTabletType _alter_type;
};
class TabletMeta {
public:
+ TabletMeta(const std::string& file_name);
TabletMeta(DataDir* data_dir);
+ OLAPStatus load_and_init();
+ FileVersionMessage& file_version(int32_t index);
+ int file_version_size();
+ OLAPStatus set_shard(int32_t shard_id);
OLAPStatus serialize(string* meta_binary);
OLAPStatus serialize_unlock(string* meta_binary);
@@ -93,13 +98,13 @@ class TabletMeta {
OLAPStatus to_tablet_pb(TabletMetaPB* tablet_meta_pb);
OLAPStatus to_tablet_pb_unlock(TabletMetaPB* tablet_meta_pb);
- OLAPStatus add_inc_rs_meta(const RowsetMeta& rs_meta);
+ OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta);
OLAPStatus delete_inc_rs_meta_by_version(const Version& version);
const RowsetMetaSharedPtr get_inc_rs_meta(const Version& version) const;
DeletePredicatePB* add_delete_predicates();
- const std::vector<RowsetMeta>& all_inc_rs_metas() const;
- const std::vector<RowsetMeta>& all_rs_metas() const;
+ const std::vector<RowsetMetaSharedPtr>& all_inc_rs_metas() const;
+ const std::vector<RowsetMetaSharedPtr>& all_rs_metas() const;
OLAPStatus modify_rowsets(const vector<RowsetMetaSharedPtr>& to_add,
const vector<RowsetMetaSharedPtr>& to_delete);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 853af2be..fa56c20c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -276,4 +276,4 @@ message OLAPDataHeaderMessage {
message OLAPRawDeltaHeaderMessage {
required int32 schema_hash = 2;
-}
\ No newline at end of file
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]