chaoyli closed pull request #482: Fix tablet compilation error
URL: https://github.com/apache/incubator-doris/pull/482
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index c6314f8c..651e0c15 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -56,7 +56,7 @@ add_library(Olap STATIC
reader.cpp
row_block.cpp
row_cursor.cpp
- schema_change.cpp
+ #schema_change.cpp
serialize.cpp
storage_engine.cpp
data_dir.cpp
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 0b57f11f..d995c82c 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -63,7 +63,7 @@ OLAPStatus Merger::merge(const vector<ColumnData*>&
olap_data_arr,
// create and initiate writer for generating new index and data files.
unique_ptr<ColumnDataWriter>
writer(ColumnDataWriter::create(_segment_group, false,
_tablet->compress_kind(), _tablet->bloom_filter_fpp()));
- DCHECK(_writer != nullptr) << "memory error occur when creating writer";
+ DCHECK(writer != nullptr) << "memory error occur when creating writer";
if (NULL == writer) {
OLAP_LOG_WARNING("fail to allocate writer.");
diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp
index a45c3ec1..f23951f5 100644
--- a/be/src/olap/olap_snapshot.cpp
+++ b/be/src/olap/olap_snapshot.cpp
@@ -359,7 +359,7 @@ OLAPStatus StorageEngine::_create_snapshot_files(
// load tablet header, in order to remove versions that not in
shortest version path
DataDir* store = ref_tablet->data_dir();
- new_tablet_meta = new(nothrow) TabletMeta();
+ new_tablet_meta = new(nothrow) TabletMeta(store);
if (new_tablet_meta == NULL) {
OLAP_LOG_WARNING("fail to malloc TabletMeta.");
res = OLAP_ERR_MALLOC_ERROR;
@@ -479,7 +479,7 @@ OLAPStatus
StorageEngine::_create_incremental_snapshot_files(
do {
// save header to snapshot path
TabletMeta tablet_meta;
- res = TabletMetaManager::get_header(ref_tablet->store(),
+ res = TabletMetaManager::get_header(ref_tablet->data_dir(),
ref_tablet->tablet_id(), ref_tablet->schema_hash(),
&tablet_meta);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to load header. res=" << res << "tablet_id="
@@ -560,7 +560,7 @@ OLAPStatus StorageEngine::_append_single_delta(
const TSnapshotRequest& request, DataDir* store) {
OLAPStatus res = OLAP_SUCCESS;
string root_path = store->path();
- TabletMeta* new_tablet_meta = new(nothrow) TabletMeta();
+ TabletMeta* new_tablet_meta = new(nothrow) TabletMeta(store);
if (new_tablet_meta == NULL) {
OLAP_LOG_WARNING("fail to malloc TabletMeta.");
return OLAP_ERR_MALLOC_ERROR;
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 26b060c0..40cb3a0e 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -168,7 +168,7 @@ OLAPStatus PushHandler::process_realtime_push(
DeleteConditionHandler del_cond_handler;
tablet_var.tablet->obtain_header_rdlock();
for (const TCondition& cond : request.delete_conditions) {
- res =
del_cond_handler.check_condition_valid(tablet_var.tablet, cond);
+ res =
del_cond_handler.check_condition_valid(tablet_var.tablet->tablet_schema(),
cond);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to check delete condition.
[table=%s res=%d]",
tablet_var.tablet->full_name().c_str(),
res);
@@ -345,11 +345,22 @@ OLAPStatus PushHandler::_convert(
}
delta_segment_group = new(std::nothrow) SegmentGroup(
- curr_tablet.get(), (_request.push_type ==
TPushType::LOAD_DELETE),
+ curr_tablet->tablet_id(),
+ curr_tablet->tablet_schema(),
+ curr_tablet->num_key_fields(),
+ curr_tablet->num_short_key_fields(),
+ curr_tablet->num_rows_per_row_block(),
+ curr_tablet->rowset_path_prefix(),
+ (_request.push_type == TPushType::LOAD_DELETE),
0, 0, true, _request.partition_id, _request.transaction_id);
} else {
delta_segment_group = new(std::nothrow) SegmentGroup(
- curr_tablet.get(),
+ curr_tablet->tablet_id(),
+ curr_tablet->tablet_schema(),
+ curr_tablet->num_key_fields(),
+ curr_tablet->num_short_key_fields(),
+ curr_tablet->num_rows_per_row_block(),
+ curr_tablet->rowset_path_prefix(),
Version(_request.version, _request.version),
_request.version_hash,
(_request.push_type == TPushType::LOAD_DELETE),
@@ -368,7 +379,9 @@ OLAPStatus PushHandler::_convert(
VLOG(3) << "init writer. tablet=" << curr_tablet->full_name()
<< ", block_row_size=" <<
curr_tablet->num_rows_per_row_block();
- if (NULL == (writer = ColumnDataWriter::create(curr_tablet,
delta_segment_group, true))) {
+ if (NULL == (writer = ColumnDataWriter::create(delta_segment_group,
true,
+
curr_tablet->compress_kind(),
+
curr_tablet->bloom_filter_fpp()))) {
OLAP_LOG_WARNING("fail to create writer. [tablet='%s']",
curr_tablet->full_name().c_str());
res = OLAP_ERR_MALLOC_ERROR;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 49876830..a2f99106 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -73,7 +73,7 @@ class CollectIterator {
}
OLAPStatus init() {
- auto res =
_row_cursor.init(_data->segment_group()->tablet()->tablet_schema());
+ auto res = _row_cursor.init(_data->tablet()->tablet_schema());
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init row cursor, res=" << res;
return res;
@@ -805,7 +805,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams&
read_params) {
OLAPStatus Reader::_init_conditions_param(const ReaderParams& read_params) {
OLAPStatus res = OLAP_SUCCESS;
- _conditions.set_tablet(_tablet);
+ _conditions.set_tablet_schema(_tablet->tablet_schema());
for (int i = 0; i < read_params.conditions.size(); ++i) {
_conditions.append_condition(read_params.conditions[i]);
ColumnPredicate* predicate =
_parse_to_predicate(read_params.conditions[i]);
@@ -1120,7 +1120,9 @@ OLAPStatus Reader::_init_load_bf_columns(const
ReaderParams& read_params) {
OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
if (read_params.reader_type != READER_CUMULATIVE_COMPACTION) {
_tablet->obtain_header_rdlock();
- OLAPStatus ret = _delete_handler.init(_tablet,
read_params.version.second);
+ OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(),
+ _tablet->delete_predicates(),
+ read_params.version.second);
_tablet->release_header_lock();
return ret;
diff --git a/be/src/olap/rowset/column_data.h b/be/src/olap/rowset/column_data.h
index 453d7a10..47ffaa2d 100644
--- a/be/src/olap/rowset/column_data.h
+++ b/be/src/olap/rowset/column_data.h
@@ -121,6 +121,7 @@ class ColumnData {
SegmentGroup* segment_group() const { return _segment_group; }
void set_segment_group(SegmentGroup* segment_group) { _segment_group =
segment_group; }
int64_t num_rows() const { return _segment_group->num_rows(); }
+ Tablet* tablet() const { return _tablet; }
private:
DISALLOW_COPY_AND_ASSIGN(ColumnData);
@@ -260,4 +261,4 @@ class ColumnDataComparator {
} // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_COLUMN_DATA_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_ROWSET_COLUMN_DATA_H
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 33fe08bb..14f98f55 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
+#include "gen_cpp/olap_file.pb.h"
#include "olap/new_status.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_builder.h"
@@ -42,6 +43,9 @@ class Rowset {
virtual NewStatus copy(RowsetBuilder* dest_rowset_builder) = 0;
virtual NewStatus remove() = 0;
+
+ virtual OLAPStatus to_rowset_pb(const RowsetMetaPB& rs_meta);
+ virtual OLAPStatus get_rs_meta();
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 8238f46c..1228441c 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -46,6 +46,8 @@ class RowsetMeta {
return ret;
}
+ virtual bool init_from_pb(const RowsetMetaPB& rs_meta_pb);
+
virtual bool deserialize_extra_properties() {
return true;
}
diff --git a/be/src/olap/rowset/segment_group.cpp
b/be/src/olap/rowset/segment_group.cpp
index b880aec6..284f87d2 100644
--- a/be/src/olap/rowset/segment_group.cpp
+++ b/be/src/olap/rowset/segment_group.cpp
@@ -61,7 +61,7 @@ namespace doris {
} while (0);
SegmentGroup::SegmentGroup(int64_t tablet_id, const RowFields& tablet_schema,
int num_key_fields, int num_short_key_fields,
- size_t num_rows_per_row_block, std::string rowset_path_prefix,
Version version, VersionHash version_hash,
+ size_t num_rows_per_row_block, const std::string&
rowset_path_prefix, Version version, VersionHash version_hash,
bool delete_flag, int32_t segment_group_id, int32_t num_segments)
: _tablet_id(tablet_id),
_tablet_schema(tablet_schema),
@@ -99,7 +99,7 @@ SegmentGroup::SegmentGroup(int64_t tablet_id, const
RowFields& tablet_schema, in
}
SegmentGroup::SegmentGroup(int64_t tablet_id, const RowFields& tablet_schema,
int num_key_fields, int num_short_key_fields,
- size_t num_rows_per_row_block, std::string rowset_path_prefix, bool
delete_flag,
+ size_t num_rows_per_row_block, const std::string& rowset_path_prefix,
bool delete_flag,
int32_t segment_group_id, int32_t num_segments, bool is_pending,
TPartitionId partition_id, TTransactionId transaction_id) :
_tablet_id(tablet_id),
_tablet_schema(tablet_schema),
@@ -706,4 +706,4 @@ int64_t SegmentGroup::get_tablet_id() {
return _tablet_id;
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_group.h
b/be/src/olap/rowset/segment_group.h
index 989e1dab..ee169bd7 100644
--- a/be/src/olap/rowset/segment_group.h
+++ b/be/src/olap/rowset/segment_group.h
@@ -48,11 +48,11 @@ class SegmentGroup {
friend class MemIndex;
public:
SegmentGroup(int64_t tablet_id, const RowFields& tablet_schema, int
num_key_fields, int num_short_key_fields,
- size_t num_rows_per_row_block, std::string tablet_path_prefix,
Version version,
+ size_t num_rows_per_row_block, const std::string&
rowset_path_prefix, Version version,
VersionHash version_hash, bool delete_flag, int segment_group_id,
int32_t num_segments);
SegmentGroup(int64_t tablet_id, const RowFields& tablet_schema, int
num_key_fields, int num_short_key_fields,
- size_t num_rows_per_row_block, std::string tablet_path_prefix,
bool delete_flag,
+ size_t num_rows_per_row_block, const std::string&
rowset_path_prefix, bool delete_flag,
int32_t segment_group_id, int32_t num_segments, bool is_pending,
TPartitionId partition_id, TTransactionId transaction_id);
@@ -227,6 +227,7 @@ class SegmentGroup {
std::string construct_index_file_path(int32_t segment) const;
std::string construct_data_file_path(int32_t segment) const;
+ size_t current_num_rows_per_row_block() const;
void publish_version(Version version, VersionHash version_hash);
const RowFields& get_tablet_schema();
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index e9e9d42e..b1d9361c 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -29,7 +29,7 @@
#include "olap/tablet.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "olap/wrapper_field.h"
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
@@ -679,10 +679,11 @@ bool
SchemaChangeDirectly::_write_row_block(ColumnDataWriter* writer, RowBlock*
return true;
}
-bool LinkedSchemaChange::process(ColumnData* olap_data, SegmentGroup*
new_segment_group) {
+bool LinkedSchemaChange::process(ColumnData* olap_data, SegmentGroup*
new_segment_group,
+ TabletSharedPtr tablet) {
for (size_t i = 0; i < olap_data->segment_group()->num_segments(); ++i) {
- string index_path =
new_segment_group->construct_index_file_path(new_segment_group->segment_group_id(),
i);
- string base_tablet_index_path =
olap_data->segment_group()->construct_index_file_path(olap_data->segment_group()->segment_group_id(),
i);
+ string index_path = new_segment_group->construct_index_file_path(i);
+ string base_tablet_index_path =
olap_data->segment_group()->construct_index_file_path(i);
if (link(base_tablet_index_path.c_str(), index_path.c_str()) == 0) {
VLOG(3) << "success to create hard link. from_path=" <<
base_tablet_index_path
<< ", to_path=" << index_path;
@@ -693,8 +694,8 @@ bool LinkedSchemaChange::process(ColumnData* olap_data,
SegmentGroup* new_segmen
return false;
}
- string data_path =
new_segment_group->construct_data_file_path(new_segment_group->segment_group_id(),
i);
- string base_tablet_data_path =
olap_data->segment_group()->construct_data_file_path(olap_data->segment_group()->segment_group_id(),
i);
+ string data_path = new_segment_group->construct_data_file_path(i);
+ string base_tablet_data_path =
olap_data->segment_group()->construct_data_file_path(i);
if (link(base_tablet_data_path.c_str(), data_path.c_str()) == 0) {
VLOG(3) << "success to create hard link. from_path=" <<
base_tablet_data_path
<< ", to_path=" << data_path;
@@ -721,8 +722,9 @@ bool LinkedSchemaChange::process(ColumnData* olap_data,
SegmentGroup* new_segmen
return true;
}
-bool SchemaChangeDirectly::process(ColumnData* olap_data, SegmentGroup*
new_segment_group) {
- DataFileType data_file_type =
new_segment_group->tablet()->data_file_type();
+bool SchemaChangeDirectly::process(ColumnData* olap_data, SegmentGroup*
new_segment_group,
+ TabletSharedPtr tablet) {
+ DataFileType data_file_type = tablet->data_file_type();
bool null_supported = true;
if (NULL == _row_block_allocator) {
@@ -777,8 +779,8 @@ bool SchemaChangeDirectly::process(ColumnData* olap_data,
SegmentGroup* new_segm
}
if (need_create_empty_version) {
- res = create_init_version(new_segment_group->tablet()->tablet_id(),
- new_segment_group->tablet()->schema_hash(),
+ res = create_init_version(tablet->tablet_id(),
+ tablet->schema_hash(),
new_segment_group->version(),
new_segment_group->version_hash(),
new_segment_group);
@@ -794,7 +796,9 @@ bool SchemaChangeDirectly::process(ColumnData* olap_data,
SegmentGroup* new_segm
<< "block_row_size=" << _tablet->num_rows_per_row_block();
bool result = true;
RowBlock* new_row_block = NULL;
- ColumnDataWriter* writer = ColumnDataWriter::create(_tablet,
new_segment_group, false);
+ ColumnDataWriter* writer = ColumnDataWriter::create(new_segment_group,
false,
+
_tablet->compress_kind(),
+
_tablet->bloom_filter_fpp());
if (NULL == writer) {
OLAP_LOG_WARNING("failed to create writer.");
result = false;
@@ -910,7 +914,8 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
SAFE_DELETE(_row_block_allocator);
}
-bool SchemaChangeWithSorting::process(ColumnData* olap_data, SegmentGroup*
new_segment_group) {
+bool SchemaChangeWithSorting::process(ColumnData* olap_data, SegmentGroup*
new_segment_group,
+ TabletSharedPtr tablet) {
if (NULL == _row_block_allocator) {
if (NULL == (_row_block_allocator = new(nothrow) RowBlockAllocator(
_tablet->tablet_schema(), _memory_limitation))) {
@@ -919,7 +924,7 @@ bool SchemaChangeWithSorting::process(ColumnData*
olap_data, SegmentGroup* new_s
}
}
- DataFileType data_file_type =
new_segment_group->tablet()->data_file_type();
+ DataFileType data_file_type = tablet->data_file_type();
bool null_supported = true;
RowBlock* ref_row_block = NULL;
@@ -940,8 +945,8 @@ bool SchemaChangeWithSorting::process(ColumnData*
olap_data, SegmentGroup* new_s
}
if (need_create_empty_version) {
- res = create_init_version(new_segment_group->tablet()->tablet_id(),
- new_segment_group->tablet()->schema_hash(),
+ res = create_init_version(tablet->tablet_id(),
+ tablet->schema_hash(),
new_segment_group->version(),
new_segment_group->version_hash(),
new_segment_group);
@@ -1068,7 +1073,7 @@ bool SchemaChangeWithSorting::process(ColumnData*
olap_data, SegmentGroup* new_s
}
// TODO(zyh): 如果_temp_delta_versions只有一个,不需要再外排
- if (!_external_sorting(olap_segment_groups, new_segment_group)) {
+ if (!_external_sorting(olap_segment_groups, new_segment_group, tablet)) {
OLAP_LOG_WARNING("failed to sorting externally.");
result = false;
goto SORTING_PROCESS_ERR;
@@ -1116,11 +1121,17 @@ bool SchemaChangeWithSorting::_internal_sorting(const
vector<RowBlock*>& row_blo
uint64_t merged_rows = 0;
RowBlockMerger merger(_tablet);
- (*temp_segment_group) = new(nothrow) SegmentGroup(_tablet.get(),
- temp_delta_versions,
- rand(),
- false,
- 0, 0);
+ (*temp_segment_group) =
+ new(nothrow) SegmentGroup(_tablet->tablet_id(),
+ _tablet->tablet_schema(),
+ _tablet->num_key_fields(),
+ _tablet->num_short_key_fields(),
+ _tablet->num_rows_per_row_block(),
+ _tablet->rowset_path_prefix(),
+ temp_delta_versions,
+ rand(),
+ false,
+ 0, 0);
if (NULL == (*temp_segment_group)) {
OLAP_LOG_WARNING("failed to malloc SegmentGroup. [size=%ld]",
sizeof(SegmentGroup));
goto INTERNAL_SORTING_ERR;
@@ -1128,7 +1139,8 @@ bool SchemaChangeWithSorting::_internal_sorting(const
vector<RowBlock*>& row_blo
VLOG(3) << "init writer. tablet=" << _tablet->full_name()
<< ", block_row_size=" << _tablet->num_rows_per_row_block();
- writer = ColumnDataWriter::create(_tablet, *temp_segment_group, false);
+ writer = ColumnDataWriter::create(*temp_segment_group, false,
+ _tablet->compress_kind(),
_tablet->bloom_filter_fpp());
if (NULL == writer) {
OLAP_LOG_WARNING("failed to create writer.");
goto INTERNAL_SORTING_ERR;
@@ -1158,7 +1170,8 @@ bool SchemaChangeWithSorting::_internal_sorting(const
vector<RowBlock*>& row_blo
bool SchemaChangeWithSorting::_external_sorting(
vector<SegmentGroup*>& src_segment_groups,
- SegmentGroup* dest_segment_group) {
+ SegmentGroup* dest_segment_group,
+ TabletSharedPtr tablet) {
Merger merger(_tablet, dest_segment_group, READER_ALTER_TABLE);
uint64_t merged_rows = 0;
@@ -1179,7 +1192,7 @@ bool SchemaChangeWithSorting::_external_sorting(
OLAP_LOG_WARNING("fail to initial olap data. [version='%d-%d'
tablet='%s']",
(*it)->version().first,
(*it)->version().second,
- (*it)->tablet()->full_name().c_str());
+ tablet->full_name().c_str());
goto EXTERNAL_SORTING_ERR;
}
}
@@ -1572,7 +1585,7 @@ OLAPStatus SchemaChangeHandler::_do_alter_tablet(
}
}
- res = delete_handler.init(ref_tablet, end_version);
+ res = delete_handler.init(new_tablet->tablet_schema(),
new_tablet->delete_predicates(), end_version);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("init delete handler failed. [tablet=%s;
end_version=%d]",
ref_tablet->full_name().c_str(), end_version);
@@ -1802,18 +1815,28 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
SegmentGroup* new_segment_group = nullptr;
if ((*it)->transaction_id() == 0) {
- new_segment_group = new SegmentGroup(dest_tablet.get(),
- olap_data->version(),
- olap_data->version_hash(),
- olap_data->delete_flag(),
- (*it)->segment_group_id(), 0);
+ new_segment_group = new SegmentGroup(dest_tablet->tablet_id(),
+ dest_tablet->tablet_schema(),
+ dest_tablet->num_key_fields(),
+
dest_tablet->num_short_key_fields(),
+
dest_tablet->num_rows_per_row_block(),
+
dest_tablet->rowset_path_prefix(),
+ olap_data->version(),
+ olap_data->version_hash(),
+ olap_data->delete_flag(),
+ (*it)->segment_group_id(), 0);
} else {
- new_segment_group = new SegmentGroup(dest_tablet.get(),
- olap_data->delete_flag(),
- (*it)->segment_group_id(), 0,
- (*it)->is_pending(),
- (*it)->partition_id(),
- (*it)->transaction_id());
+ new_segment_group = new SegmentGroup(dest_tablet->tablet_id(),
+ dest_tablet->tablet_schema(),
+ dest_tablet->num_key_fields(),
+
dest_tablet->num_short_key_fields(),
+
dest_tablet->num_rows_per_row_block(),
+
dest_tablet->rowset_path_prefix(),
+ olap_data->delete_flag(),
+ (*it)->segment_group_id(), 0,
+ (*it)->is_pending(),
+ (*it)->partition_id(),
+ (*it)->transaction_id());
}
if (NULL == new_segment_group) {
@@ -1824,11 +1847,11 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
new_segment_groups->push_back(new_segment_group);
- if (!sc_procedure->process(olap_data, new_segment_group)) {
+ if (!sc_procedure->process(olap_data, new_segment_group, dest_tablet))
{
if ((*it)->is_pending()) {
OLAP_LOG_WARNING("failed to process the transaction when
schema change. "
"[tablet='%s' transaction=%ld]",
- (*it)->tablet()->full_name().c_str(),
+ dest_tablet->full_name().c_str(),
(*it)->transaction_id());
} else {
OLAP_LOG_WARNING("failed to process the version.
[version='%d-%d']",
@@ -2027,7 +2050,12 @@ OLAPStatus
SchemaChangeHandler::_alter_tablet(SchemaChangeParams* sc_params) {
// we create a new delta with the same version as the ColumnData
processing currently.
SegmentGroup* new_segment_group = new(nothrow) SegmentGroup(
- sc_params->new_tablet.get(),
+ sc_params->new_tablet->tablet_id(),
+
sc_params->new_tablet->tablet_schema(),
+
sc_params->new_tablet->num_key_fields(),
+
sc_params->new_tablet->num_short_key_fields(),
+
sc_params->new_tablet->num_rows_per_row_block(),
+
sc_params->new_tablet->rowset_path_prefix(),
(*it)->version(),
(*it)->version_hash(),
(*it)->delete_flag(),
@@ -2044,8 +2072,8 @@ OLAPStatus
SchemaChangeHandler::_alter_tablet(SchemaChangeParams* sc_params) {
if (DEL_SATISFIED == del_ret) {
VLOG(3) << "filter delta in schema change:"
<< (*it)->version().first << "-" <<
(*it)->version().second;
- res =
sc_procedure->create_init_version(new_segment_group->tablet()->tablet_id(),
-
new_segment_group->tablet()->schema_hash(),
+ res =
sc_procedure->create_init_version(sc_params->new_tablet->tablet_id(),
+
sc_params->new_tablet->schema_hash(),
new_segment_group->version(),
new_segment_group->version_hash(),
new_segment_group);
@@ -2065,7 +2093,7 @@ OLAPStatus
SchemaChangeHandler::_alter_tablet(SchemaChangeParams* sc_params) {
(*it)->set_delete_status(DEL_NOT_SATISFIED);
}
- if (DEL_SATISFIED != del_ret && !sc_procedure->process(*it,
new_segment_group)) {
+ if (DEL_SATISFIED != del_ret && !sc_procedure->process(*it,
new_segment_group, sc_params->new_tablet)) {
//if del_ret is DEL_SATISFIED, the new delta version has already
been created in new_tablet
OLAP_LOG_WARNING("failed to process the version.
[version='%d-%d']",
(*it)->version().first, (*it)->version().second);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 3f9ff7c2..6686b39b 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -25,6 +25,7 @@
#include "gen_cpp/AgentService_types.h"
#include "olap/delete_handler.h"
#include "olap/rowset/column_data.h"
+#include "olap/tablet.h"
namespace doris {
// defined in 'field.h'
@@ -150,7 +151,9 @@ class SchemaChange {
SchemaChange() : _filted_rows(0), _merged_rows(0) {}
virtual ~SchemaChange() {}
- virtual bool process(ColumnData* olap_data, SegmentGroup*
new_segment_group) = 0;
+ virtual bool process(ColumnData* olap_data,
+ SegmentGroup* new_segment_group,
+ TabletSharedPtr tablet) = 0;
void add_filted_rows(uint64_t filted_rows) {
_filted_rows += filted_rows;
@@ -195,7 +198,8 @@ class LinkedSchemaChange : public SchemaChange {
TabletSharedPtr new_tablet);
~LinkedSchemaChange() {}
- bool process(ColumnData* olap_data, SegmentGroup* new_segment_group);
+ bool process(ColumnData* olap_data, SegmentGroup* new_segment_group,
+ TabletSharedPtr tablet);
private:
TabletSharedPtr _base_tablet;
TabletSharedPtr _new_tablet;
@@ -212,7 +216,8 @@ class SchemaChangeDirectly : public SchemaChange {
const RowBlockChanger& row_block_changer);
virtual ~SchemaChangeDirectly();
- virtual bool process(ColumnData* olap_data, SegmentGroup*
new_segment_group);
+ virtual bool process(ColumnData* olap_data, SegmentGroup*
new_segment_group,
+ TabletSharedPtr tablet);
private:
TabletSharedPtr _tablet;
@@ -235,7 +240,8 @@ class SchemaChangeWithSorting : public SchemaChange {
size_t memory_limitation);
virtual ~SchemaChangeWithSorting();
- virtual bool process(ColumnData* olap_data, SegmentGroup*
new_segment_group);
+ virtual bool process(ColumnData* olap_data, SegmentGroup*
new_segment_group,
+ TabletSharedPtr tablet);
private:
bool _internal_sorting(
@@ -245,7 +251,8 @@ class SchemaChangeWithSorting : public SchemaChange {
bool _external_sorting(
std::vector<SegmentGroup*>& src_segment_group_arr,
- SegmentGroup* segment_group);
+ SegmentGroup* segment_group,
+ TabletSharedPtr tablet);
TabletSharedPtr _tablet;
const RowBlockChanger& _row_block_changer;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 06bf704c..cc7f6273 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -829,7 +829,7 @@ TabletSharedPtr StorageEngine::create_tablet(
return nullptr;
}
} else {
- stores.push_back(ref_tablet->store());
+ stores.push_back(ref_tablet->data_dir());
}
return _tablet_mgr.create_tablet(request, ref_root_path,
@@ -1027,12 +1027,11 @@ void StorageEngine::add_unused_index(SegmentGroup*
segment_group) {
auto it = _gc_files.find(segment_group);
if (it == _gc_files.end()) {
vector<string> files;
- int32_t segment_group_id = segment_group->segment_group_id();
for (size_t seg_id = 0; seg_id < segment_group->num_segments();
++seg_id) {
- string index_file =
segment_group->construct_index_file_path(segment_group_id, seg_id);
+ string index_file =
segment_group->construct_index_file_path(seg_id);
files.push_back(index_file);
- string data_file =
segment_group->construct_data_file_path(segment_group_id, seg_id);
+ string data_file = segment_group->construct_data_file_path(seg_id);
files.push_back(data_file);
}
_gc_files[segment_group] = files;
@@ -1203,7 +1202,7 @@ OLAPStatus StorageEngine::cancel_delete(const
TCancelDeleteDataReq& request) {
DeleteConditionHandler cond_handler;
for (TabletSharedPtr temp_tablet : table_list) {
temp_tablet->obtain_header_wrlock();
- res = cond_handler.delete_cond(temp_tablet, request.version, false);
+ res = cond_handler.delete_cond(&temp_tablet->delete_predicates(),
request.version, false);
if (res != OLAP_SUCCESS) {
temp_tablet->release_header_lock();
OLAP_LOG_WARNING("cancel delete failed. [res=%d tablet=%s]",
@@ -1223,7 +1222,7 @@ OLAPStatus StorageEngine::cancel_delete(const
TCancelDeleteDataReq& request) {
// Show delete conditions in tablet header.
for (TabletSharedPtr tablet : table_list) {
- cond_handler.log_conds(tablet);
+ cond_handler.log_conds(tablet->full_name(),
tablet->delete_predicates());
}
LOG(INFO) << "finish to process cancel delete. res=" << res;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index dcd6e98c..1d14b063 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -50,22 +50,19 @@ using std::set;
using std::sort;
using std::string;
using std::stringstream;
-using vector;
+using std::vector;
namespace doris {
-Tablet(TabletMeta* tablet_meta, DataDir* data_dir) {
+Tablet::Tablet(TabletMeta* tablet_meta, DataDir* data_dir) {
_data_dir = data_dir;
- rs_graph->construct_rowset_graph(tablet_meta->get_all_rowsets());
+ _rs_graph->construct_rowset_graph(tablet_meta->all_rs_metas());
}
Tablet::~Tablet() {
// ensure that there is nobody using Tablet, like acquiring
OLAPData(SegmentGroup)
WriteLock wrlock(&_meta_lock);
- for (auto& it : _version_rowset_map) {
- SAFE_DELETE(it.second);
- }
- _version_rowset_map.clear();
+ _rs_version_map.clear();
}
OLAPStatus Tablet::init_once() {
@@ -75,7 +72,7 @@ OLAPStatus Tablet::init_once() {
const RowsetMeta* rs_meta = _tablet_meta.get_rs_meta(ser);
Version version = rs_meta->version();
RowsetSharedPtr rowset(new Rowset(rs_meta));
- _version_rowset_map[version] = rowset;
+ _rs_version_map[version] = rowset;
rowset->init();
}
return OLAP_SUCCESS;
@@ -86,12 +83,12 @@ bool Tablet::can_do_compaction() {
// 如果选路成功,则转换完成,可以进行BE
// 如果选路失败,则转换未完成,不能进行BE
ReadLock rdlock(&_meta_lock);
- RowsetSharedPtr lastest_rowset = lastest_version();
+ const PDelta* lastest_rowset = lastest_version();
if (lastest_rowset == NULL) {
return false;
}
- Version test_version = Version(0, lastest_version->end_version());
+ Version test_version = Version(0, lastest_delta->end_version());
vector<Version> path_versions;
if (OLAP_SUCCESS != _rs_graph->capture_consistent_versions(test_version,
&path_versions)) {
LOG(WARNING) << "tablet has missed version. tablet=" << full_name();
@@ -99,7 +96,7 @@ bool Tablet::can_do_compaction() {
}
if (this->is_schema_changing()) {
- Version test_version = Version(0, lastest_version->end_version());
+ Version test_version = Version(0, lastest_delta->end_version());
vector<Version> path_versions;
if (OLAP_SUCCESS !=
_rs_graph->capture_consistent_versions(test_version, &path_versions)) {
return false;
@@ -122,17 +119,17 @@ OLAPStatus Tablet::capture_consistent_versions(
OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version,
vector<std::shared_ptr<RowsetReader>>* rs_readers) {
vector<Version> version_path;
- _rs_graph->capture_consistent_versions(spec_version, version_graph);
+ _rs_graph->capture_consistent_versions(spec_version, &version_path);
- acquire_rs_reader_by_version(version_graph, rs_readers);
- return OLAP_SUCCESS;
+ acquire_rs_reader_by_version(version_path, rs_readers);
+ return OLAP_SUCCESS;
}
void Tablet::acquire_rs_reader_by_version(const vector<Version>& version_vec,
vector<std::shared_ptr<RowsetReader>>* rs_readers) const {
DCHECK(rs_readers != NULL && rs_readers->empty());
for (auto version : version_vec) {
- auto it2 = _rs_version_map.find(*it1);
+ auto it2 = _rs_version_map.find(version);
if (it2 == _rs_version_map.end()) {
LOG(WARNING) << "fail to find Rowset for version. tablet=" <<
full_name()
<< ", version='" << version.first << "-" <<
version.second;
@@ -140,14 +137,14 @@ void Tablet::acquire_rs_reader_by_version(const
vector<Version>& version_vec,
return;
}
- std::shared_ptr<RowsetReader> rs_reader = RowsetReader::create(*it2);
+ std::shared_ptr<RowsetReader> rs_reader(new RowsetReader());
Status status = rs_reader->init();
if (!status.ok()) {
LOG(WARNING) << "fail to init rowset_reader. tablet=" <<
full_name()
<< ", version=" << version.first << "-" <<
version.second;
release_rs_readers(rs_readers);
}
- rs_reader->push_back(std::move(rs_reader)):
+ rs_readers->push_back(std::move(rs_reader)):
}
}
@@ -162,17 +159,17 @@ OLAPStatus
Tablet::release_rs_readers(vector<std::shared_ptr<RowsetReader>>* rs_
}
OLAPStatus Tablet::add_inc_rowset(const Rowset& rowset) {
- return _table_meta.add_inc_rs_meta(rowset->get_rs_meta());
+ return _table_meta.add_inc_rs_meta(rowset.get_rs_meta());
}
OLAPStatus Tablet::delete_expired_inc_rowset() {
time_t now = time(NULL);
vector<Version> expired_versions;
WriteLock wrlock(&_meta_lock);
- for (auto& it : _tablet_meta->all_inc_rowsets()) {
+ for (auto& it : _tablet_meta.all_inc_rs_metas()) {
double diff = difftime(now, it.creation_time());
- if (diff >= config::inc_rowset_expire_sec) {
- expire_versions.push_back(it->version());
+ if (diff >= config::inc_rowset_expired_sec) {
+ expired_versions.push_back(it->version());
}
}
for (auto& it : expired_versions) {
@@ -181,7 +178,7 @@ OLAPStatus Tablet::delete_expired_inc_rowset() {
<< ", version=" << it.first << "-" << it.second;
}
- if (_tablet_meta->save_meta() != OLAP_SUCCESS) {
+ if (_tablet_meta.save_meta() != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet_meta when delete expired
inc_rowset. "
<< "tablet=" << full_name();
}
@@ -189,7 +186,7 @@ OLAPStatus Tablet::delete_expired_inc_rowset() {
}
void Tablet::delete_inc_rowset_by_version(const Version& version) {
- _tablet_meta->delete_inc_rs_meta_by_version(version);
+ _tablet_meta.delete_inc_rs_meta_by_version(version);
VLOG(3) << "delete inc rowset. tablet=" << full_name()
<< ", version=" << version.first << "-" << version.second;
}
@@ -198,8 +195,8 @@ void Tablet::calc_missed_versions(int64_t spec_version,
vector<Version>* missed_versions) const {
DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
std::list<Version> existing_versions;
- for (RowsetMeta& rs : _tablet_meta->get_all_rs_metas()) {
- existing_versions.emplace_back(rs.version());
+ for (RowsetMeta& rs : _tablet_meta.all_rs_metas()) {
+ existing_versions.emplace_back(rs.get_version());
}
// sort the existing versions in ascending order
@@ -228,20 +225,20 @@ void Tablet::calc_missed_versions(int64_t spec_version,
OLAPStatus Tablet::modify_rowsets(vector<RowsetSharedPtr>& to_add,
vector<RowsetSharedPtr>& to_delete) {
- return OLAP_SUCESS;
+ return OLAP_SUCCESS;
}
RowsetSharedPtr Tablet::rowset_with_largest_size() {
RowsetSharedPtr largest_rowset = nullptr;
size_t ser = 0;
- for (auto& it : _version_rowset_map) {
+ for (auto& it : _rs_version_map) {
// use segment_group of base file as target segment_group when base is
not empty,
// or try to find the biggest segment_group.
if (largest_rowset->empty() || largest_rowset->zero_num_rows()) {
continue;
}
- if (it.second->index_size() > largest_rowset->index_size()) {
+ if (it.second->get_rs_meta().get_index_index_size() >
largest_rowset->get_rs_meta().get_index_disk_size()) {
largest_rowset = it.second;
}
}
@@ -268,14 +265,14 @@ OLAPStatus Tablet::split_range(
RowBlockPosition step_pos;
// 此helper用于辅助查找,注意它的内容不能拿来使用,是不可预知的,仅作为辅助使用
- if (helper_cursor.init(_tablet_schema, num_short_key_fields()) !=
OLAP_SUCCESS) {
+ if (helper_cursor.init(tablet_schema(), num_short_key_fields()) !=
OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to parse strings to key with RowCursor type.");
return OLAP_ERR_INVALID_SCHEMA;
}
// 如果有startkey,用startkey初始化;反之则用minkey初始化
if (start_key_strings.size() > 0) {
- if (start_key.init_scan_key(_tablet_schema,
start_key_strings.values()) != OLAP_SUCCESS) {
+ if (start_key.init_scan_key(tablet_schema(),
start_key_strings.values()) != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to initial key strings with RowCursor
type.");
return OLAP_ERR_INIT_FAILED;
}
@@ -285,18 +282,18 @@ OLAPStatus Tablet::split_range(
return OLAP_ERR_INVALID_SCHEMA;
}
} else {
- if (start_key.init(_tablet_schema, num_short_key_fields()) !=
OLAP_SUCCESS) {
+ if (start_key.init(tablet_schema(), num_short_key_fields()) !=
OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to initial key strings with RowCursor
type.");
return OLAP_ERR_INIT_FAILED;
}
- start_key.allocate_memory_for_string_type(_tablet_schema);
+ start_key.allocate_memory_for_string_type(tablet_schema());
start_key.build_min_key();
}
// 和startkey一样处理,没有则用maxkey初始化
if (end_key_strings.size() > 0) {
- if (OLAP_SUCCESS != end_key.init_scan_key(_tablet_schema,
end_key_strings.values())) {
+ if (OLAP_SUCCESS != end_key.init_scan_key(tablet_schema(),
end_key_strings.values())) {
OLAP_LOG_WARNING("fail to parse strings to key with RowCursor
type.");
return OLAP_ERR_INVALID_SCHEMA;
}
@@ -306,17 +303,17 @@ OLAPStatus Tablet::split_range(
return OLAP_ERR_INVALID_SCHEMA;
}
} else {
- if (end_key.init(_tablet_schema, num_short_key_fields()) !=
OLAP_SUCCESS) {
+ if (end_key.init(tablet_schema(), num_short_key_fields()) !=
OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to initial key strings with RowCursor
type.");
return OLAP_ERR_INIT_FAILED;
}
- end_key.allocate_memory_for_string_type(_tablet_schema);
+ end_key.allocate_memory_for_string_type(tablet_schema());
end_key.build_max_key();
}
ReadLock rdlock(get_header_lock_ptr());
- RowsetSharedPtr base_index = get_largest_rowset();
+ SegmentGroup* base_index = get_largest_index();
// 如果找不到合适的segment_group,就直接返回startkey,endkey
if (base_index == NULL) {
@@ -361,8 +358,8 @@ OLAPStatus Tablet::split_range(
RowCursor cur_start_key;
RowCursor last_start_key;
- if (cur_start_key.init(_tablet_schema, num_short_key_fields()) !=
OLAP_SUCCESS
- || last_start_key.init(_tablet_schema, num_short_key_fields()) !=
OLAP_SUCCESS) {
+ if (cur_start_key.init(tablet_schema(), num_short_key_fields()) !=
OLAP_SUCCESS
+ || last_start_key.init(tablet_schema(), num_short_key_fields()) !=
OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init cursor");
return OLAP_ERR_INIT_FAILED;
}
@@ -406,22 +403,22 @@ OLAPStatus Tablet::split_range(
}
bool Tablet::has_version(const Version& version) const {
- return (_version_rowset_map.find(version) != _version_rowset_map.end());
+ return (_rs_version_map.find(version) != _rs_version_map.end());
}
void Tablet::list_versions(vector<Version>* versions) const {
- DCHECK(versions != nullptr && versions.empty());
+ DCHECK(versions != nullptr && versions->empty());
// versions vector is not sorted.
- for (auto& it : _version_rowset_map) {
- versions->push_back(it->first);
+ for (auto& it : _rs_version_map) {
+ versions->push_back(it.first);
}
}
void Tablet::list_entities(vector<VersionEntity>* entities) const {
DCHECK(entities != nullptr && entities.empty());
- for (auto& it : _version_rowset_map) {
+ for (auto& it : _rs_version_map) {
RowsetSharedPtr rowset = it->second;
VersionEntity entity(it->first, rowset);
entities->push_back(entity);
@@ -438,7 +435,7 @@ size_t Tablet::get_row_size() const {
int64_t Tablet::get_data_size() const {
int64_t total_size = 0;
- for (auto& it : _version_rowset_map) {
+ for (auto& it : _rs_version_map) {
total_size += it.second->get_data_size();
}
return total_size;
@@ -446,7 +443,7 @@ int64_t Tablet::get_data_size() const {
int64_t Tablet::get_num_rows() const {
int64_t num_rows = 0;
- for (auto& it : _version_rowset_map) {
+ for (auto& it : _rs_version_map) {
total_size += it.second->get_num_rows();
}
}
@@ -456,7 +453,7 @@ bool Tablet::is_deletion_rowset(const Version& version) {
return false;
}
- for (auto& it : _tablet_meta->delete_predicates()) {
+ for (auto& it : _tablet_meta.delete_predicates()) {
if (it->version() == version) {
return true;
}
@@ -468,22 +465,22 @@ bool Tablet::is_deletion_rowset(const Version& version) {
bool Tablet::is_schema_changing() {
bool is_schema_changing = false;
- ReadLock rdlock(_meta_lock);
- if (_tablet_meta->alter_state() != AlterTabletState::NONE) {
+ ReadLock rdlock(&_meta_lock);
+ if (_tablet_meta.alter_state() != AlterTabletState::NONE) {
is_schema_changing = true;
}
return is_schema_changing;
}
-bool Tablet::get_schema_change_request(int64_t* tablet_id, int64_t*
schema_hash,
+bool Tablet::get_schema_change_request(int64_t* tablet_id, TSchemaHash*
schema_hash,
vector<Version>* versions_to_alter,
AlterTabletType* alter_tablet_type)
const {
- if (_tablet_meta->alter_state() == AlterTabletState::none) {
+ if (_tablet_meta.alter_state() == AlterTabletState::none) {
return false;
}
- const AlterTabletTask alter_task = _tablet_meta->alter_task();
+ const AlterTabletTask alter_task = _tablet_meta.alter_task();
*tablet_id = alter_task.related_tablet_id();
*schema_hash = alter_task.related_schema_hash();
*alter_tablet_type = alter_task.alter_tablet_type();
@@ -508,7 +505,7 @@ void Tablet::set_schema_change_request(int64_t tablet_id,
}
alter_task->set_alter_tablet_type(alter_tablet_type);
- _tablet_meta->add_alter_task();
+ _tablet_meta.add_alter_task();
}
bool Tablet::remove_last_schema_change_version(TabletSharedPtr new_tablet) {
@@ -516,29 +513,27 @@ bool
Tablet::remove_last_schema_change_version(TabletSharedPtr new_tablet) {
}
void Tablet::clear_schema_change_request() {
- LOG(INFO) << "clear schema change status. [tablet='" << _full_name << "']";
- _tablet_meta->delete_alter_task();
+ LOG(INFO) << "clear schema change status. [tablet='" << full_name() <<
"']";
+ _tablet_meta.delete_alter_task();
}
void Tablet::set_io_error() {
OLAP_LOG_WARNING("io error occur.[tablet_full_name='%s',
root_path_name='%s']",
- _full_name.c_str(),
- _storage_root_path.c_str());
- StorageEngine::get_instance()->set_store_used_flag(_storage_root_path,
false);
+ full_name().c_str());
}
bool Tablet::is_used() {
- return _store->is_used();
+ return _data_dir->is_used();
}
VersionEntity Tablet::get_version_entity_by_version(const Version& version) {
- RowsetSharedPtr rowset = _version_rowset_map[version];
+ RowsetSharedPtr rowset = _rs_version_map[version];
VersionEntity entity(version, rowset);
return entity;
}
size_t Tablet::get_version_data_size(const Version& version) {
- RowsetSharedPtr rowset = _version_rowset_map[version];
+ RowsetSharedPtr rowset = _rs_version_map[version];
return rowset->get_data_disk_size();
}
@@ -548,7 +543,7 @@ OLAPStatus
Tablet::recover_tablet_until_specfic_version(const int64_t& spec_vers
OLAPStatus Tablet::test_version(const Version& version) {
vector<Version> span_versions;
- ReadLock rdlock(_meta_lock);
+ ReadLock rdlock(&_meta_lock);
OLAPStatus res = _rs_graph->capture_consistent_versions(version,
&span_versions);
return res;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 0807e90c..c775721d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -80,6 +80,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
void acquire_data_sources_by_versions(const std::vector<Version>&
version_list,
std::vector<ColumnData*>* sources)
const;
OLAPStatus release_data_sources(std::vector<ColumnData*>* data_sources)
const;
+ OLAPStatus register_data_source(const std::vector<SegmentGroup*>&
segment_group_vec);
OLAPStatus unregister_data_source(const Version& version,
std::vector<SegmentGroup*>* segment_group_vec);
OLAPStatus add_pending_version(int64_t partition_id, int64_t
transaction_id,
const std::vector<std::string>*
delete_conditions);
@@ -92,7 +93,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
void load_pending_data();
OLAPStatus publish_version(int64_t transaction_id, Version version,
VersionHash version_hash);
const PDelta* get_incremental_delta(Version version) const;
- void get_missing_versions_with_meta_locked(
+ void get_missing_versions_with_header_locked(
int64_t until_version, std::vector<Version>* missing_versions)
const;
OLAPStatus is_push_for_delete(int64_t transaction_id, bool*
is_push_for_delete) const;
OLAPStatus clone_data(const TabletMeta& clone_header,
@@ -170,7 +171,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
void set_cumulative_layer_point(const int32_t new_point);
bool is_schema_changing();
bool get_schema_change_request(TTabletId* tablet_id,
- SchemaHash* schema_hash,
+ TSchemaHash* schema_hash,
std::vector<Version>* versions_to_changed,
AlterTabletType* alter_table_type) const;
void set_schema_change_request(int64_t tablet_id,
@@ -196,9 +197,13 @@ class Tablet : public std::enable_shared_from_this<Tablet>
{
size_t get_version_data_size(const Version& version);
OLAPStatus recover_tablet_until_specfic_version(const int64_t&
until_version,
const int64_t&
version_hash);
+ const std::string& rowset_path_prefix();
+ void set_id(int64_t id);
+ OLAPStatus register_tablet_into_dir();
+ OLAPStatus init_once();
OLAPStatus capture_consistent_rowsets(const Version& spec_version,
vector<std::shared_ptr<RowsetReader>>* rs_readers);
void acquire_rs_reader_by_version(const vector<Version>& version_vec,
@@ -231,6 +236,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
size_t get_row_size() const;
size_t get_index_size() const;
size_t all_rowsets_size() const;
+ size_t get_data_size() const;
size_t get_num_rows() const;
size_t get_rowset_size(const Version& version);
@@ -280,6 +286,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
uint32_t segment_size() const;
void set_io_error();
RowsetSharedPtr rowset_with_largest_size();
+ SegmentGroup* get_largest_index();
public:
DataDir* _data_dir;
TabletState _state;
@@ -301,7 +308,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
return seed;
}
};
- std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_version_rowset_map;
+ std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_rs_version_map;
DISALLOW_COPY_AND_ASSIGN(Tablet);
};
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 41ab3a8d..69e7bc74 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -44,7 +44,7 @@
#include "olap/schema_change.h"
#include "olap/data_dir.h"
#include "olap/utils.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "util/time.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
@@ -241,7 +241,13 @@ OLAPStatus TabletManager::create_init_version(TTabletId
tablet_id, SchemaHash sc
break;
}
- new_segment_group = new(nothrow) SegmentGroup(tablet.get(), version,
version_hash, false, 0, 0);
+ new_segment_group = new(nothrow) SegmentGroup(tablet->tablet_id(),
+ tablet->tablet_schema(),
+ tablet->num_key_fields(),
+
tablet->num_short_key_fields(),
+
tablet->num_rows_per_row_block(),
+
tablet->rowset_path_prefix(),
+ version, version_hash,
false, 0, 0);
if (new_segment_group == NULL) {
LOG(WARNING) << "fail to malloc index. [tablet=" <<
tablet->full_name() << "]";
res = OLAP_ERR_MALLOC_ERROR;
@@ -881,11 +887,11 @@ OLAPStatus
TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
tablet_info.__set_transaction_ids(transaction_ids);
if (_available_storage_medium_type_count > 1) {
-
tablet_info.__set_storage_medium(tablet_ptr->store()->storage_medium());
+
tablet_info.__set_storage_medium(tablet_ptr->data_dir()->storage_medium());
}
tablet_info.__set_version_count(tablet_ptr->file_delta_size());
- tablet_info.__set_path_hash(tablet_ptr->store()->path_hash());
+ tablet_info.__set_path_hash(tablet_ptr->data_dir()->path_hash());
tablet.tablet_infos.push_back(tablet_info);
}
@@ -1104,6 +1110,7 @@ OLAPStatus TabletManager::_create_new_tablet_header(
}
// set basic information
+ /*
header->set_num_short_key_fields(request.tablet_schema.short_key_column_count);
header->set_compress_kind(COMPRESS_LZ4);
if (request.tablet_schema.keys_type == TKeysType::DUP_KEYS) {
@@ -1117,6 +1124,7 @@ OLAPStatus TabletManager::_create_new_tablet_header(
header->set_data_file_type(COLUMN_ORIENTED_FILE);
header->set_segment_size(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE);
header->set_num_rows_per_data_block(config::default_num_rows_per_column_file_block);
+ */
// set column information
uint32_t i = 0;
@@ -1132,7 +1140,7 @@ OLAPStatus TabletManager::_create_new_tablet_header(
LOG(WARNING) << "varchar type column should be the last short
key.";
return OLAP_ERR_SCHEMA_SCHEMA_INVALID;
}
- header->add_column();
+ //header->add_column();
if (true == is_schema_change_tablet) {
/*
* for schema change, compare old_tablet and new_tablet
@@ -1147,16 +1155,17 @@ OLAPStatus TabletManager::_create_new_tablet_header(
for (field_off = 0; field_off < field_num; ++field_off) {
if (ref_tablet->tablet_schema()[field_off].name ==
column.column_name) {
uint32_t unique_id =
ref_tablet->tablet_schema()[field_off].unique_id;
- header->mutable_column(i)->set_unique_id(unique_id);
+ //header->mutable_column(i)->set_unique_id(unique_id);
break;
}
}
if (field_off == field_num) {
- header->mutable_column(i)->set_unique_id(next_unique_id++);
+ //header->mutable_column(i)->set_unique_id(next_unique_id++);
}
} else {
- header->mutable_column(i)->set_unique_id(i);
+ //header->mutable_column(i)->set_unique_id(i);
}
+ /*
header->mutable_column(i)->set_name(column.column_name);
header->mutable_column(i)->set_is_root_column(true);
string data_type;
@@ -1212,6 +1221,7 @@ OLAPStatus TabletManager::_create_new_tablet_header(
header->mutable_column(i)->set_is_bf_column(column.is_bloom_filter_column);
has_bf_columns = true;
}
+ */
++i;
}
if (true == is_schema_change_tablet){
@@ -1223,6 +1233,7 @@ OLAPStatus TabletManager::_create_new_tablet_header(
} else {
header->set_next_column_unique_id(i);
}
+ /*
if (has_bf_columns && request.tablet_schema.__isset.bloom_filter_fpp) {
header->set_bf_fpp(request.tablet_schema.bloom_filter_fpp);
}
@@ -1231,8 +1242,9 @@ OLAPStatus TabletManager::_create_new_tablet_header(
<< "key_num=" << key_count << " short_key_num=" <<
request.tablet_schema.short_key_column_count;
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
+ */
- header->set_creation_time(time(NULL));
+ //header->set_creation_time(time(NULL));
header->set_cumulative_layer_point(-1);
header->set_tablet_id(request.tablet_id);
header->set_schema_hash(request.tablet_schema.schema_hash);
@@ -1275,7 +1287,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
_tablet_map.erase(tablet_id);
}
- res = dropped_tablet->store()->deregister_tablet(dropped_tablet.get());
+ res = dropped_tablet->data_dir()->deregister_tablet(dropped_tablet.get());
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to unregister from root path. [res=%d
tablet=%ld]",
res, tablet_id);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 0dfb2542..5c02c723 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -16,6 +16,9 @@
// under the License.
#include "olap/tablet_meta.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/tablet_meta_manager.h"
namespace doris {
@@ -93,8 +96,8 @@ OLAPStatus TabletMeta::save_meta_unlock() {
string meta_binary;
serialize_unlock(&meta_binary);
OLAPStatus status = TabletMetaManager::save(_data_dir, _tablet_id,
_schema_hash, meta_binary);
- if (status != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to save tablet_meta. status=" <<
status.to_string()
+ if (status != OLAP_SUCCESS)
+ LOG(WARNING) << "fail to save tablet_meta. status=" << status
<< ", tablet_id=" << _tablet_id
<< ", schema_hash=" << _schema_hash;
}
@@ -115,12 +118,12 @@ OLAPStatus TabletMeta::to_tablet_pb_unlock(TabletMetaPB*
tablet_meta_pb) {
tablet_meta_pb->set_tablet_name(_tablet_name);
for (auto rs : _rs_metas) {
- rs.to_rowset_pb(tablet_meta_pb-.add_rs_meta());
+ rs.to_rowset_pb(tablet_meta_pb->add_rs_meta());
}
for (auto rs : _inc_rs_metas) {
- rs.to_rowset_pb(tablet_meta_pb.add_inc_rc_meta());
+ rs.to_rowset_pb(tablet_meta_pb->add_inc_rc_meta());
}
- _schema.to_schema_pb(tablet_meta_pb.mutable_schema());
+ _schema.to_schema_pb(tablet_meta_pb->mutable_schema());
return OLAP_SUCCESS;
}
@@ -144,7 +147,7 @@ OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMeta&
rs_meta) {
return OLAP_SUCCESS;
}
-OLAPStatus TabletMeta::delete_inc_rs_meta_by_version(const Version& version)
+OLAPStatus TabletMeta::delete_inc_rs_meta_by_version(const Version& version) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto rs : _inc_rs_metas) {
if (rs.version() == version) {
@@ -160,7 +163,7 @@ OLAPStatus TabletMeta::delete_inc_rs_meta_by_version(const
Version& version)
return OLAP_SUCCESS;
}
-const RowsetMeta* TabletMeta::get_inc_rowset(const Version& version) const;
+const RowsetMeta* TabletMeta::get_inc_rowset(const Version& version) const {
std::lock_guard<std::mutex> lock(_mutex);
RowsetMeta* rs_meta = nullptr;
for (int i = 0; i < _inc_rs_metas.size(); ++i) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index bf328bff..fa6fbe74 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -27,6 +27,7 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/tablet_schema.h"
+#include "olap/rowset/rowset.h"
using std::string;
using std::vector;
@@ -48,12 +49,8 @@ enum AlterTabletState {
ALTER_FAILED
};
-class Rowset;
-using RowsetSharedPtr = std::shared_ptr<Rowset>;
-
class RowsetMeta;
-using RowsetMetaSharedPtr = std::shared_ptr<RowsetMeta>;
-
+class Rowset;
class DataDir;
class AlterTabletTask {
@@ -64,6 +61,8 @@ class AlterTabletTask {
inline int64_t related_tablet_id() { return _related_tablet_id; }
inline int64_t related_schema_hash() { return _related_schema_hash; }
+ inline int64_t set_related_tablet_id(int64_t related_tablet_id) {
_related_tablet_id = related_tablet_id; }
+ inline int64_t set_related_schema_hash(int64_t schema_hash) {
_related_schema_hash = schema_hash; }
vector<RowsetMetaSharedPtr>& rowsets_to_alter() { return
_rowsets_to_alter; }
@@ -79,13 +78,41 @@ class AlterTabletTask {
class TabletMeta {
public:
- TabletMeta(const std::string& file_name);
- TabletMeta(DataDir* data_dir);
-
+ OLAPStatus init();
OLAPStatus load_and_init();
FileVersionMessage& file_version(int32_t index);
int file_version_size();
+ int file_delta_size();
+ Version get_latest_version();
OLAPStatus set_shard(int32_t shard_id);
+ OLAPStatus save(const std::string& file_path);
+ OLAPStatus clear_schema_change_status();
+ OLAPStatus delete_all_versions();
+ OLAPStatus delete_version(Version version);
+ OLAPStatus add_version(Version version, VersionHash version_hash,
+ int32_t segment_group_id, int32_t num_segments,
+ int64_t index_size, int64_t data_size, int64_t
num_rows,
+ bool empty, const std::vector<KeyRange>*
column_statistics);
+ const PDelta* get_incremental_version(Version version) const;
+ std::string& file_name() const;
+ const PDelta* get_delta(int index) const;
+ const PDelta* get_base_version() const;
+ const uint32_t get_cumulative_compaction_score() const;
+ const uint32_t get_base_compaction_score() const;
+ const OLAPStatus version_creation_time(const Version& version, int64_t*
creation_time) const;
+ void set_tablet_id(int64_t tablet_id);
+ void set_schema_hash(TSchemaHash schema_hash);
+ void set_cumulative_layer_point(int32_t point);
+ void set_next_column_unique_id(int32_t unique_id);
+ void set_compress_kind(CompressKind kind);
+ void set_keys_type(KeysType keys_type);
+ void set_data_file_type(DataFileType type);
+ int32_t cumulative_layer_point();
+ void set_num_rows_per_data_block(size_t
default_num_rows_per_column_file_block);
+
+ TabletMeta();
+ TabletMeta(const std::string& file_name);
+ TabletMeta(DataDir* data_dir);
OLAPStatus serialize(string* meta_binary);
OLAPStatus serialize_unlock(string* meta_binary);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 20005c45..500f5133 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -44,7 +44,7 @@
#include "olap/schema_change.h"
#include "olap/data_dir.h"
#include "olap/utils.h"
-#include "olap/rowset/data_writer.h"
+#include "olap/rowset/column_data_writer.h"
#include "util/time.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]