This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 003335c1c5 [refactor](schema change) spark dpp need not call convert 
rowset during load process (#11397)
003335c1c5 is described below

commit 003335c1c51173b1bdc10afbb231833c2830ca56
Author: yiguolei <[email protected]>
AuthorDate: Tue Aug 2 10:18:00 2022 +0800

    [refactor](schema change) spark dpp need not call convert rowset during 
load process (#11397)
    
    * remove unused schema change logic in push handler
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/olap/push_handler.cpp  | 154 ++++++++++++------------------------------
 be/src/olap/push_handler.h    |  17 +----
 be/src/olap/schema_change.cpp |  99 ---------------------------
 be/src/olap/schema_change.h   |   5 --
 4 files changed, 47 insertions(+), 228 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 3a934776f3..8c5637abb9 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -60,13 +60,15 @@ Status 
PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
 
     DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);
 
-    std::vector<TabletVars> tablet_vars(1);
-    tablet_vars[0].tablet = tablet;
-    res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, 
tablet_info_vec);
+    res = _do_streaming_ingestion(tablet, request, push_type, tablet_info_vec);
 
     if (res.ok()) {
         if (tablet_info_vec != nullptr) {
-            _get_tablet_infos(tablet_vars, tablet_info_vec);
+            TTabletInfo tablet_info;
+            tablet_info.tablet_id = tablet->tablet_id();
+            tablet_info.schema_hash = tablet->schema_hash();
+            
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
+            tablet_info_vec->push_back(tablet_info);
         }
         LOG(INFO) << "process realtime push successfully. "
                   << "tablet=" << tablet->full_name() << ", partition_id=" << 
request.partition_id
@@ -78,7 +80,6 @@ Status 
PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
 
 Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const 
TPushReq& request,
                                             PushType push_type,
-                                            std::vector<TabletVars>* 
tablet_vars,
                                             std::vector<TTabletInfo>* 
tablet_info_vec) {
     // add transaction in engine, then check sc status
     // lock, prevent sc handler checking transaction concurrently
@@ -99,10 +100,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr 
tablet, const TPushR
                 request.partition_id, tablet, request.transaction_id, 
load_id));
     }
 
-    if (tablet_vars->size() == 1) {
-        tablet_vars->resize(2);
-    }
-
     // not call validate request here, because realtime load does not
     // contain version info
 
@@ -110,117 +107,79 @@ Status 
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
     // check delete condition if push for delete
     std::queue<DeletePredicatePB> del_preds;
     if (push_type == PUSH_FOR_DELETE) {
-        for (TabletVars& tablet_var : *tablet_vars) {
-            if (tablet_var.tablet == nullptr) {
-                continue;
-            }
-
-            DeletePredicatePB del_pred;
-            TabletSchema tablet_schema;
-            tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
-            if (!request.columns_desc.empty() && 
request.columns_desc[0].col_unique_id >= 0) {
-                tablet_schema.clear_columns();
-                for (const auto& column_desc : request.columns_desc) {
-                    tablet_schema.append_column(TabletColumn(column_desc));
-                }
-            }
-            res = DeleteHandler::generate_delete_predicate(tablet_schema, 
request.delete_conditions,
-                                                           &del_pred);
-            del_preds.push(del_pred);
-            if (!res.ok()) {
-                LOG(WARNING) << "fail to generate delete condition. res=" << 
res
-                             << ", tablet=" << tablet_var.tablet->full_name();
-                return res;
+        DeletePredicatePB del_pred;
+        TabletSchema tablet_schema;
+        tablet_schema.copy_from(*tablet->tablet_schema());
+        if (!request.columns_desc.empty() && 
request.columns_desc[0].col_unique_id >= 0) {
+            tablet_schema.clear_columns();
+            for (const auto& column_desc : request.columns_desc) {
+                tablet_schema.append_column(TabletColumn(column_desc));
             }
         }
+        res = DeleteHandler::generate_delete_predicate(tablet_schema, 
request.delete_conditions,
+                                                       &del_pred);
+        del_preds.push(del_pred);
+        if (!res.ok()) {
+            LOG(WARNING) << "fail to generate delete condition. res=" << res
+                         << ", tablet=" << tablet->full_name();
+            return res;
+        }
     }
 
     // check if version number exceed limit
-    if (tablet_vars->at(0).tablet->version_count() > 
config::max_tablet_version_num) {
-        LOG(WARNING) << "failed to push data. version count: "
-                     << tablet_vars->at(0).tablet->version_count()
+    if (tablet->version_count() > config::max_tablet_version_num) {
+        LOG(WARNING) << "failed to push data. version count: " << 
tablet->version_count()
                      << ", exceed limit: " << config::max_tablet_version_num
-                     << ". tablet: " << tablet_vars->at(0).tablet->full_name();
+                     << ". tablet: " << tablet->full_name();
         return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
     }
     auto tablet_schema = std::make_shared<TabletSchema>();
-    tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
+    tablet_schema->copy_from(*tablet->tablet_schema());
     if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id 
>= 0) {
         tablet_schema->clear_columns();
         for (const auto& column_desc : request.columns_desc) {
             tablet_schema->append_column(TabletColumn(column_desc));
         }
     }
-
+    RowsetSharedPtr rowset_to_add;
     // writes
     if (push_type == PUSH_NORMAL_V2) {
-        res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
-                          &(tablet_vars->at(0).rowset_to_add), 
&(tablet_vars->at(1).rowset_to_add),
-                          tablet_schema);
+        res = _convert_v2(tablet, &rowset_to_add, tablet_schema);
 
     } else {
-        res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
-                       &(tablet_vars->at(0).rowset_to_add), 
&(tablet_vars->at(1).rowset_to_add),
-                       tablet_schema);
+        res = _convert(tablet, &rowset_to_add, tablet_schema);
     }
     if (!res.ok()) {
         LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << 
res
                      << ", failed to process realtime push."
                      << ", tablet=" << tablet->full_name()
                      << ", transaction_id=" << request.transaction_id;
-        for (TabletVars& tablet_var : *tablet_vars) {
-            if (tablet_var.tablet == nullptr) {
-                continue;
-            }
 
-            Status rollback_status = 
StorageEngine::instance()->txn_manager()->rollback_txn(
-                    request.partition_id, tablet_var.tablet, 
request.transaction_id);
-            // has to check rollback status to ensure not delete a committed 
rowset
-            if (rollback_status.ok()) {
-                
StorageEngine::instance()->add_unused_rowset(tablet_var.rowset_to_add);
-            }
+        Status rollback_status = 
StorageEngine::instance()->txn_manager()->rollback_txn(
+                request.partition_id, tablet, request.transaction_id);
+        // has to check rollback status to ensure not delete a committed rowset
+        if (rollback_status.ok()) {
+            StorageEngine::instance()->add_unused_rowset(rowset_to_add);
         }
         return res;
     }
 
     // add pending data to tablet
-    for (TabletVars& tablet_var : *tablet_vars) {
-        if (tablet_var.tablet == nullptr) {
-            continue;
-        }
 
-        if (push_type == PUSH_FOR_DELETE) {
-            
tablet_var.rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
-            del_preds.pop();
-        }
-        Status commit_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
-                request.partition_id, tablet_var.tablet, 
request.transaction_id, load_id,
-                tablet_var.rowset_to_add, false);
-        if (commit_status != Status::OK() &&
-            commit_status.precise_code() != 
OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
-            res = commit_status;
-        }
+    if (push_type == PUSH_FOR_DELETE) {
+        rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
+        del_preds.pop();
     }
-    return res;
-}
-
-void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,
-                                    std::vector<TTabletInfo>* tablet_info_vec) 
{
-    for (const TabletVars& tablet_var : tablet_vars) {
-        if (tablet_var.tablet.get() == nullptr) {
-            continue;
-        }
-
-        TTabletInfo tablet_info;
-        tablet_info.tablet_id = tablet_var.tablet->tablet_id();
-        tablet_info.schema_hash = tablet_var.tablet->schema_hash();
-        
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
-        tablet_info_vec->push_back(tablet_info);
+    Status commit_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
+            request.partition_id, tablet, request.transaction_id, load_id, 
rowset_to_add, false);
+    if (commit_status != Status::OK() &&
+        commit_status.precise_code() != 
OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
+        res = commit_status;
     }
+    return res;
 }
 
-Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr 
new_tablet,
-                                RowsetSharedPtr* cur_rowset, RowsetSharedPtr* 
new_rowset,
+Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* 
cur_rowset,
                                 TabletSchemaSPtr tablet_schema) {
     Status res = Status::OK();
     uint32_t num_rows = 0;
@@ -325,18 +284,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr 
cur_tablet, TabletSharedPtr new_
 
         _write_bytes += (*cur_rowset)->data_disk_size();
         _write_rows += (*cur_rowset)->num_rows();
-
-        // 5. Convert data for schema change tables
-        VLOG_TRACE << "load to related tables of schema_change if possible.";
-        if (new_tablet != nullptr) {
-            res = SchemaChangeHandler::schema_version_convert(
-                    cur_tablet, new_tablet, cur_rowset, new_rowset, 
*_desc_tbl, tablet_schema);
-            if (!res.ok()) {
-                LOG(WARNING) << "failed to change schema version for delta."
-                             << "[res=" << res << " new_tablet='" << 
new_tablet->full_name()
-                             << "']";
-            }
-        }
     } while (false);
 
     VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << 
cur_tablet->full_name()
@@ -344,8 +291,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, 
TabletSharedPtr new_
     return res;
 }
 
-Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr 
new_tablet,
-                             RowsetSharedPtr* cur_rowset, RowsetSharedPtr* 
new_rowset,
+Status PushHandler::_convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* 
cur_rowset,
                              TabletSchemaSPtr tablet_schema) {
     Status res = Status::OK();
     RowCursor row;
@@ -466,18 +412,6 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, 
TabletSharedPtr new_tab
 
         _write_bytes += (*cur_rowset)->data_disk_size();
         _write_rows += (*cur_rowset)->num_rows();
-
-        // 7. Convert data for schema change tables
-        VLOG_TRACE << "load to related tables of schema_change if possible.";
-        if (new_tablet != nullptr) {
-            res = SchemaChangeHandler::schema_version_convert(
-                    cur_tablet, new_tablet, cur_rowset, new_rowset, 
*_desc_tbl, tablet_schema);
-            if (!res.ok()) {
-                LOG(WARNING) << "failed to change schema version for delta."
-                             << "[res=" << res << " new_tablet='" << 
new_tablet->full_name()
-                             << "']";
-            }
-        }
     } while (false);
 
     SAFE_DELETE(reader);
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index b22d319845..02384e9f2d 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -39,11 +39,6 @@ class BinaryReader;
 struct ColumnMapping;
 class RowCursor;
 
-struct TabletVars {
-    TabletSharedPtr tablet;
-    RowsetSharedPtr rowset_to_add;
-};
-
 class PushHandler {
 public:
     using SchemaMapping = std::vector<ColumnMapping>;
@@ -60,24 +55,18 @@ public:
     int64_t write_rows() const { return _write_rows; }
 
 private:
-    Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr 
new_tablet_vec,
-                       RowsetSharedPtr* cur_rowset, RowsetSharedPtr* 
new_rowset,
+    Status _convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
                        TabletSchemaSPtr tablet_schema);
     // Convert local data file to internal formatted delta,
     // return new delta's SegmentGroup
-    Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
-                    RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
+    Status _convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
                     TabletSchemaSPtr tablet_schema);
 
     // Only for debug
     std::string _debug_version_list(const Versions& versions) const;
 
-    void _get_tablet_infos(const std::vector<TabletVars>& tablet_infos,
-                           std::vector<TTabletInfo>* tablet_info_vec);
-
     Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& 
request,
-                                   PushType push_type, vector<TabletVars>* 
tablet_vars,
-                                   std::vector<TTabletInfo>* tablet_info_vec);
+                                   PushType push_type, 
std::vector<TTabletInfo>* tablet_info_vec);
 
 private:
     // mainly tablet_id, version and delta file path
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 7672e10eec..abc83ff768 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -2039,105 +2039,6 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t 
tablet_id) {
     return _tablet_ids_in_converting.find(tablet_id) != 
_tablet_ids_in_converting.end();
 }
 
-Status SchemaChangeHandler::schema_version_convert(
-        TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, 
RowsetSharedPtr* base_rowset,
-        RowsetSharedPtr* new_rowset, DescriptorTbl desc_tbl, TabletSchemaSPtr 
base_schema_change) {
-    Status res = Status::OK();
-    LOG(INFO) << "begin to convert delta version for schema changing. "
-              << "base_tablet=" << base_tablet->full_name()
-              << ", new_tablet=" << new_tablet->full_name();
-
-    // a. Parse the Alter request and convert it into an internal 
representation
-    // Do not use the delete condition specified by the DELETE_DATA command
-    RowBlockChanger rb_changer(*new_tablet->tablet_schema(), desc_tbl);
-    bool sc_sorting = false;
-    bool sc_directly = false;
-
-    const std::unordered_map<std::string, AlterMaterializedViewParam> 
materialized_function_map;
-    if (res = _parse_request(base_tablet, new_tablet, &rb_changer, 
&sc_sorting, &sc_directly,
-                             materialized_function_map, desc_tbl, 
base_schema_change.get());
-        !res) {
-        LOG(WARNING) << "failed to parse the request. res=" << res;
-        return res;
-    }
-
-    // NOTE split_table if row_block is used, the original block will become 
smaller
-    // But since the historical data will become normal after the subsequent 
base/cumulative, it is also possible to use directly
-    // b. Generate historical data converter
-    auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
-
-    // c. Convert data
-    DeleteHandler delete_handler;
-    std::vector<ColumnId> return_columns;
-    size_t num_cols = base_schema_change->num_columns();
-    return_columns.resize(num_cols);
-    for (int i = 0; i < num_cols; ++i) {
-        return_columns[i] = i;
-    }
-
-    RowsetReaderContext reader_context;
-    reader_context.reader_type = READER_ALTER_TABLE;
-    reader_context.tablet_schema = base_schema_change.get();
-    reader_context.need_ordered_result = true;
-    reader_context.delete_handler = &delete_handler;
-    reader_context.return_columns = &return_columns;
-    reader_context.seek_columns = &return_columns;
-    reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
-    reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
-    reader_context.is_vec = config::enable_vectorized_alter_table;
-
-    RowsetReaderSharedPtr rowset_reader;
-    RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
-    RETURN_NOT_OK(rowset_reader->init(&reader_context));
-    PUniqueId load_id;
-    load_id.set_hi((*base_rowset)->load_id().hi());
-    load_id.set_lo((*base_rowset)->load_id().lo());
-    std::unique_ptr<RowsetWriter> rowset_writer;
-    RETURN_NOT_OK(new_tablet->create_rowset_writer(
-            (*base_rowset)->txn_id(), load_id, PREPARED,
-            (*base_rowset)->rowset_meta()->segments_overlap(), 
base_schema_change, &rowset_writer));
-
-    auto schema_version_convert_error = [&]() -> Status {
-        if (*new_rowset != nullptr) {
-            StorageEngine::instance()->add_unused_rowset(*new_rowset);
-        }
-
-        LOG(WARNING) << "failed to convert rowsets. "
-                     << " base_tablet=" << base_tablet->full_name()
-                     << ", new_tablet=" << new_tablet->full_name() << " res = 
" << res;
-        return res;
-    };
-
-    if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), 
new_tablet, base_tablet);
-        !res) {
-        if ((*base_rowset)->is_pending()) {
-            LOG(WARNING) << "failed to process the transaction when schema 
change. "
-                         << "tablet=" << new_tablet->full_name() << "'"
-                         << ", transaction=" << (*base_rowset)->txn_id();
-        } else {
-            LOG(WARNING) << "failed to process the version. "
-                         << "version=" << (*base_rowset)->version().first << 
"-"
-                         << (*base_rowset)->version().second;
-        }
-        new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
-                                                   
rowset_writer->rowset_id().to_string());
-        return schema_version_convert_error();
-    }
-    *new_rowset = rowset_writer->build();
-    new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
-                                               
rowset_writer->rowset_id().to_string());
-    if (*new_rowset == nullptr) {
-        LOG(WARNING) << "build rowset failed.";
-        res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
-        return schema_version_convert_error();
-    }
-
-    LOG(INFO) << "successfully convert rowsets. "
-              << " base_tablet=" << base_tablet->full_name()
-              << ", new_tablet=" << new_tablet->full_name();
-    return res;
-}
-
 Status SchemaChangeHandler::_get_versions_to_be_changed(
         TabletSharedPtr base_tablet, std::vector<Version>* 
versions_to_be_changed,
         RowsetSharedPtr* max_rowset) {
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 4ef608fb4b..0ce3761d4f 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -249,11 +249,6 @@ private:
 
 class SchemaChangeHandler {
 public:
-    static Status schema_version_convert(TabletSharedPtr base_tablet, 
TabletSharedPtr new_tablet,
-                                         RowsetSharedPtr* base_rowset, 
RowsetSharedPtr* new_rowset,
-                                         DescriptorTbl desc_tbl,
-                                         TabletSchemaSPtr base_schema_change);
-
     // schema change v2, it will not set alter task in base tablet
     static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to