This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 003335c1c5 [refactor](schema change) spark dpp need not call convert
rowset during load process (#11397)
003335c1c5 is described below
commit 003335c1c51173b1bdc10afbb231833c2830ca56
Author: yiguolei <[email protected]>
AuthorDate: Tue Aug 2 10:18:00 2022 +0800
[refactor](schema change) spark dpp need not call convert rowset during
load process (#11397)
* remove unused schema change logic in push handler
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/push_handler.cpp | 154 ++++++++++++------------------------------
be/src/olap/push_handler.h | 17 +----
be/src/olap/schema_change.cpp | 99 ---------------------------
be/src/olap/schema_change.h | 5 --
4 files changed, 47 insertions(+), 228 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 3a934776f3..8c5637abb9 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -60,13 +60,15 @@ Status
PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);
- std::vector<TabletVars> tablet_vars(1);
- tablet_vars[0].tablet = tablet;
- res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars,
tablet_info_vec);
+ res = _do_streaming_ingestion(tablet, request, push_type, tablet_info_vec);
if (res.ok()) {
if (tablet_info_vec != nullptr) {
- _get_tablet_infos(tablet_vars, tablet_info_vec);
+ TTabletInfo tablet_info;
+ tablet_info.tablet_id = tablet->tablet_id();
+ tablet_info.schema_hash = tablet->schema_hash();
+
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
+ tablet_info_vec->push_back(tablet_info);
}
LOG(INFO) << "process realtime push successfully. "
<< "tablet=" << tablet->full_name() << ", partition_id=" <<
request.partition_id
@@ -78,7 +80,6 @@ Status
PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const
TPushReq& request,
PushType push_type,
- std::vector<TabletVars>*
tablet_vars,
std::vector<TTabletInfo>*
tablet_info_vec) {
// add transaction in engine, then check sc status
// lock, prevent sc handler checking transaction concurrently
@@ -99,10 +100,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr
tablet, const TPushR
request.partition_id, tablet, request.transaction_id,
load_id));
}
- if (tablet_vars->size() == 1) {
- tablet_vars->resize(2);
- }
-
// not call validate request here, because realtime load does not
// contain version info
@@ -110,117 +107,79 @@ Status
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
// check delete condition if push for delete
std::queue<DeletePredicatePB> del_preds;
if (push_type == PUSH_FOR_DELETE) {
- for (TabletVars& tablet_var : *tablet_vars) {
- if (tablet_var.tablet == nullptr) {
- continue;
- }
-
- DeletePredicatePB del_pred;
- TabletSchema tablet_schema;
- tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
- if (!request.columns_desc.empty() &&
request.columns_desc[0].col_unique_id >= 0) {
- tablet_schema.clear_columns();
- for (const auto& column_desc : request.columns_desc) {
- tablet_schema.append_column(TabletColumn(column_desc));
- }
- }
- res = DeleteHandler::generate_delete_predicate(tablet_schema,
request.delete_conditions,
- &del_pred);
- del_preds.push(del_pred);
- if (!res.ok()) {
- LOG(WARNING) << "fail to generate delete condition. res=" <<
res
- << ", tablet=" << tablet_var.tablet->full_name();
- return res;
+ DeletePredicatePB del_pred;
+ TabletSchema tablet_schema;
+ tablet_schema.copy_from(*tablet->tablet_schema());
+ if (!request.columns_desc.empty() &&
request.columns_desc[0].col_unique_id >= 0) {
+ tablet_schema.clear_columns();
+ for (const auto& column_desc : request.columns_desc) {
+ tablet_schema.append_column(TabletColumn(column_desc));
}
}
+ res = DeleteHandler::generate_delete_predicate(tablet_schema,
request.delete_conditions,
+ &del_pred);
+ del_preds.push(del_pred);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to generate delete condition. res=" << res
+ << ", tablet=" << tablet->full_name();
+ return res;
+ }
}
// check if version number exceed limit
- if (tablet_vars->at(0).tablet->version_count() >
config::max_tablet_version_num) {
- LOG(WARNING) << "failed to push data. version count: "
- << tablet_vars->at(0).tablet->version_count()
+ if (tablet->version_count() > config::max_tablet_version_num) {
+ LOG(WARNING) << "failed to push data. version count: " <<
tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
- << ". tablet: " << tablet_vars->at(0).tablet->full_name();
+ << ". tablet: " << tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}
auto tablet_schema = std::make_shared<TabletSchema>();
- tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
+ tablet_schema->copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id
>= 0) {
tablet_schema->clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema->append_column(TabletColumn(column_desc));
}
}
-
+ RowsetSharedPtr rowset_to_add;
// writes
if (push_type == PUSH_NORMAL_V2) {
- res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
- &(tablet_vars->at(0).rowset_to_add),
&(tablet_vars->at(1).rowset_to_add),
- tablet_schema);
+ res = _convert_v2(tablet, &rowset_to_add, tablet_schema);
} else {
- res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
- &(tablet_vars->at(0).rowset_to_add),
&(tablet_vars->at(1).rowset_to_add),
- tablet_schema);
+ res = _convert(tablet, &rowset_to_add, tablet_schema);
}
if (!res.ok()) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" <<
res
<< ", failed to process realtime push."
<< ", tablet=" << tablet->full_name()
<< ", transaction_id=" << request.transaction_id;
- for (TabletVars& tablet_var : *tablet_vars) {
- if (tablet_var.tablet == nullptr) {
- continue;
- }
- Status rollback_status =
StorageEngine::instance()->txn_manager()->rollback_txn(
- request.partition_id, tablet_var.tablet,
request.transaction_id);
- // has to check rollback status to ensure not delete a committed
rowset
- if (rollback_status.ok()) {
-
StorageEngine::instance()->add_unused_rowset(tablet_var.rowset_to_add);
- }
+ Status rollback_status =
StorageEngine::instance()->txn_manager()->rollback_txn(
+ request.partition_id, tablet, request.transaction_id);
+ // has to check rollback status to ensure not delete a committed rowset
+ if (rollback_status.ok()) {
+ StorageEngine::instance()->add_unused_rowset(rowset_to_add);
}
return res;
}
// add pending data to tablet
- for (TabletVars& tablet_var : *tablet_vars) {
- if (tablet_var.tablet == nullptr) {
- continue;
- }
- if (push_type == PUSH_FOR_DELETE) {
-
tablet_var.rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
- del_preds.pop();
- }
- Status commit_status =
StorageEngine::instance()->txn_manager()->commit_txn(
- request.partition_id, tablet_var.tablet,
request.transaction_id, load_id,
- tablet_var.rowset_to_add, false);
- if (commit_status != Status::OK() &&
- commit_status.precise_code() !=
OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
- res = commit_status;
- }
+ if (push_type == PUSH_FOR_DELETE) {
+ rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
+ del_preds.pop();
}
- return res;
-}
-
-void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,
- std::vector<TTabletInfo>* tablet_info_vec)
{
- for (const TabletVars& tablet_var : tablet_vars) {
- if (tablet_var.tablet.get() == nullptr) {
- continue;
- }
-
- TTabletInfo tablet_info;
- tablet_info.tablet_id = tablet_var.tablet->tablet_id();
- tablet_info.schema_hash = tablet_var.tablet->schema_hash();
-
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
- tablet_info_vec->push_back(tablet_info);
+ Status commit_status =
StorageEngine::instance()->txn_manager()->commit_txn(
+ request.partition_id, tablet, request.transaction_id, load_id,
rowset_to_add, false);
+ if (commit_status != Status::OK() &&
+ commit_status.precise_code() !=
OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
+ res = commit_status;
}
+ return res;
}
-Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
new_tablet,
- RowsetSharedPtr* cur_rowset, RowsetSharedPtr*
new_rowset,
+Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr*
cur_rowset,
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
uint32_t num_rows = 0;
@@ -325,18 +284,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr
cur_tablet, TabletSharedPtr new_
_write_bytes += (*cur_rowset)->data_disk_size();
_write_rows += (*cur_rowset)->num_rows();
-
- // 5. Convert data for schema change tables
- VLOG_TRACE << "load to related tables of schema_change if possible.";
- if (new_tablet != nullptr) {
- res = SchemaChangeHandler::schema_version_convert(
- cur_tablet, new_tablet, cur_rowset, new_rowset,
*_desc_tbl, tablet_schema);
- if (!res.ok()) {
- LOG(WARNING) << "failed to change schema version for delta."
- << "[res=" << res << " new_tablet='" <<
new_tablet->full_name()
- << "']";
- }
- }
} while (false);
VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" <<
cur_tablet->full_name()
@@ -344,8 +291,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet,
TabletSharedPtr new_
return res;
}
-Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr
new_tablet,
- RowsetSharedPtr* cur_rowset, RowsetSharedPtr*
new_rowset,
+Status PushHandler::_convert(TabletSharedPtr cur_tablet, RowsetSharedPtr*
cur_rowset,
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
RowCursor row;
@@ -466,18 +412,6 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet,
TabletSharedPtr new_tab
_write_bytes += (*cur_rowset)->data_disk_size();
_write_rows += (*cur_rowset)->num_rows();
-
- // 7. Convert data for schema change tables
- VLOG_TRACE << "load to related tables of schema_change if possible.";
- if (new_tablet != nullptr) {
- res = SchemaChangeHandler::schema_version_convert(
- cur_tablet, new_tablet, cur_rowset, new_rowset,
*_desc_tbl, tablet_schema);
- if (!res.ok()) {
- LOG(WARNING) << "failed to change schema version for delta."
- << "[res=" << res << " new_tablet='" <<
new_tablet->full_name()
- << "']";
- }
- }
} while (false);
SAFE_DELETE(reader);
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index b22d319845..02384e9f2d 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -39,11 +39,6 @@ class BinaryReader;
struct ColumnMapping;
class RowCursor;
-struct TabletVars {
- TabletSharedPtr tablet;
- RowsetSharedPtr rowset_to_add;
-};
-
class PushHandler {
public:
using SchemaMapping = std::vector<ColumnMapping>;
@@ -60,24 +55,18 @@ public:
int64_t write_rows() const { return _write_rows; }
private:
- Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
new_tablet_vec,
- RowsetSharedPtr* cur_rowset, RowsetSharedPtr*
new_rowset,
+ Status _convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema);
// Convert local data file to internal formatted delta,
// return new delta's SegmentGroup
- Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
- RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
+ Status _convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema);
// Only for debug
std::string _debug_version_list(const Versions& versions) const;
- void _get_tablet_infos(const std::vector<TabletVars>& tablet_infos,
- std::vector<TTabletInfo>* tablet_info_vec);
-
Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq&
request,
- PushType push_type, vector<TabletVars>*
tablet_vars,
- std::vector<TTabletInfo>* tablet_info_vec);
+ PushType push_type,
std::vector<TTabletInfo>* tablet_info_vec);
private:
// mainly tablet_id, version and delta file path
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 7672e10eec..abc83ff768 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -2039,105 +2039,6 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t
tablet_id) {
return _tablet_ids_in_converting.find(tablet_id) !=
_tablet_ids_in_converting.end();
}
-Status SchemaChangeHandler::schema_version_convert(
- TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* base_rowset,
- RowsetSharedPtr* new_rowset, DescriptorTbl desc_tbl, TabletSchemaSPtr
base_schema_change) {
- Status res = Status::OK();
- LOG(INFO) << "begin to convert delta version for schema changing. "
- << "base_tablet=" << base_tablet->full_name()
- << ", new_tablet=" << new_tablet->full_name();
-
- // a. Parse the Alter request and convert it into an internal
representation
- // Do not use the delete condition specified by the DELETE_DATA command
- RowBlockChanger rb_changer(*new_tablet->tablet_schema(), desc_tbl);
- bool sc_sorting = false;
- bool sc_directly = false;
-
- const std::unordered_map<std::string, AlterMaterializedViewParam>
materialized_function_map;
- if (res = _parse_request(base_tablet, new_tablet, &rb_changer,
&sc_sorting, &sc_directly,
- materialized_function_map, desc_tbl,
base_schema_change.get());
- !res) {
- LOG(WARNING) << "failed to parse the request. res=" << res;
- return res;
- }
-
- // NOTE split_table if row_block is used, the original block will become
smaller
- // But since the historical data will become normal after the subsequent
base/cumulative, it is also possible to use directly
- // b. Generate historical data converter
- auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
-
- // c. Convert data
- DeleteHandler delete_handler;
- std::vector<ColumnId> return_columns;
- size_t num_cols = base_schema_change->num_columns();
- return_columns.resize(num_cols);
- for (int i = 0; i < num_cols; ++i) {
- return_columns[i] = i;
- }
-
- RowsetReaderContext reader_context;
- reader_context.reader_type = READER_ALTER_TABLE;
- reader_context.tablet_schema = base_schema_change.get();
- reader_context.need_ordered_result = true;
- reader_context.delete_handler = &delete_handler;
- reader_context.return_columns = &return_columns;
- reader_context.seek_columns = &return_columns;
- reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
- reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
- reader_context.is_vec = config::enable_vectorized_alter_table;
-
- RowsetReaderSharedPtr rowset_reader;
- RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
- RETURN_NOT_OK(rowset_reader->init(&reader_context));
- PUniqueId load_id;
- load_id.set_hi((*base_rowset)->load_id().hi());
- load_id.set_lo((*base_rowset)->load_id().lo());
- std::unique_ptr<RowsetWriter> rowset_writer;
- RETURN_NOT_OK(new_tablet->create_rowset_writer(
- (*base_rowset)->txn_id(), load_id, PREPARED,
- (*base_rowset)->rowset_meta()->segments_overlap(),
base_schema_change, &rowset_writer));
-
- auto schema_version_convert_error = [&]() -> Status {
- if (*new_rowset != nullptr) {
- StorageEngine::instance()->add_unused_rowset(*new_rowset);
- }
-
- LOG(WARNING) << "failed to convert rowsets. "
- << " base_tablet=" << base_tablet->full_name()
- << ", new_tablet=" << new_tablet->full_name() << " res =
" << res;
- return res;
- };
-
- if (res = sc_procedure->process(rowset_reader, rowset_writer.get(),
new_tablet, base_tablet);
- !res) {
- if ((*base_rowset)->is_pending()) {
- LOG(WARNING) << "failed to process the transaction when schema
change. "
- << "tablet=" << new_tablet->full_name() << "'"
- << ", transaction=" << (*base_rowset)->txn_id();
- } else {
- LOG(WARNING) << "failed to process the version. "
- << "version=" << (*base_rowset)->version().first <<
"-"
- << (*base_rowset)->version().second;
- }
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
-
rowset_writer->rowset_id().to_string());
- return schema_version_convert_error();
- }
- *new_rowset = rowset_writer->build();
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
-
rowset_writer->rowset_id().to_string());
- if (*new_rowset == nullptr) {
- LOG(WARNING) << "build rowset failed.";
- res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
- return schema_version_convert_error();
- }
-
- LOG(INFO) << "successfully convert rowsets. "
- << " base_tablet=" << base_tablet->full_name()
- << ", new_tablet=" << new_tablet->full_name();
- return res;
-}
-
Status SchemaChangeHandler::_get_versions_to_be_changed(
TabletSharedPtr base_tablet, std::vector<Version>*
versions_to_be_changed,
RowsetSharedPtr* max_rowset) {
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 4ef608fb4b..0ce3761d4f 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -249,11 +249,6 @@ private:
class SchemaChangeHandler {
public:
- static Status schema_version_convert(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
- RowsetSharedPtr* base_rowset,
RowsetSharedPtr* new_rowset,
- DescriptorTbl desc_tbl,
- TabletSchemaSPtr base_schema_change);
-
// schema change v2, it will not set alter task in base tablet
static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]