This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f573918aa43 [Chore](materialized-view) output reference column info 
when create mv can't find ref column (#27182)
f573918aa43 is described below

commit f573918aa43ad6ffcf2729aecce6ccf26c40a829
Author: Pxl <[email protected]>
AuthorDate: Thu Nov 30 16:48:06 2023 +0800

    [Chore](materialized-view) output reference column info when create mv 
can't find ref column (#27182)
    
    output reference column info when create mv can't find ref column
---
 be/src/olap/column_mapping.h                       |   4 +-
 be/src/olap/schema_change.cpp                      | 125 +++++++++++----------
 .../suites/mv_p0/no_await/no_await.groovy          |   2 +-
 3 files changed, 69 insertions(+), 62 deletions(-)

diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h
index ba6e0a6857a..41564bda3cc 100644
--- a/be/src/olap/column_mapping.h
+++ b/be/src/olap/column_mapping.h
@@ -27,12 +27,12 @@ namespace doris {
 class WrapperField;
 
 struct ColumnMapping {
-    ColumnMapping() : ref_column(-1), default_value(nullptr) {}
+    ColumnMapping() = default;
     virtual ~ColumnMapping() = default;
 
     // <0: use default value
     // >=0: use origin column
-    int32_t ref_column;
+    int32_t ref_column = -1;
     // normally for default value. stores values for filters
     WrapperField* default_value = nullptr;
     std::shared_ptr<TExpr> expr;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index e29e1f277dc..40cf4f9f88d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -17,16 +17,13 @@
 
 #include "olap/schema_change.h"
 
-#include <gen_cpp/AgentService_types.h>
-#include <gen_cpp/Exprs_types.h>
-#include <gen_cpp/olap_file.pb.h>
-
 #include <algorithm>
 #include <exception>
 #include <map>
 #include <mutex>
 #include <roaring/roaring.hh>
 #include <tuple>
+#include <utility>
 
 #include "common/logging.h"
 #include "common/signal_handler.h"
@@ -82,7 +79,7 @@ class CollectionValue;
 
 using namespace ErrorCode;
 
-constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+constexpr int ALTER_TABLE_BATCH_SIZE = 4064;
 
 class MultiBlockMerger {
 public:
@@ -91,7 +88,7 @@ public:
     Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
                  RowsetWriter* rowset_writer, uint64_t* merged_rows) {
         int rows = 0;
-        for (auto& block : blocks) {
+        for (const auto& block : blocks) {
             rows += block->rows();
         }
         if (!rows) {
@@ -100,7 +97,7 @@ public:
 
         std::vector<RowRef> row_refs;
         row_refs.reserve(rows);
-        for (auto& block : blocks) {
+        for (const auto& block : blocks) {
             for (uint16_t i = 0; i < block->rows(); i++) {
                 row_refs.emplace_back(block.get(), i);
             }
@@ -127,7 +124,7 @@ public:
                                     vectorized::AGG_LOAD_SUFFIX);
                     agg_functions.push_back(function);
                     // create aggregate data
-                    vectorized::AggregateDataPtr place = new 
char[function->size_of_data()];
+                    auto* place = new char[function->size_of_data()];
                     function->create(place);
                     agg_places.push_back(place);
                 } catch (...) {
@@ -150,7 +147,7 @@ public:
                 auto row_ref = row_refs[i];
 
                 for (int j = key_number; j < columns; j++) {
-                    auto column_ptr = row_ref.get_column(j).get();
+                    const auto* column_ptr = row_ref.get_column(j).get();
                     agg_functions[j - key_number]->add(
                             agg_places[j - key_number],
                             const_cast<const 
vectorized::IColumn**>(&column_ptr), row_ref.position,
@@ -243,7 +240,7 @@ private:
 };
 
 BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl 
desc_tbl)
-        : _desc_tbl(desc_tbl) {
+        : _desc_tbl(std::move(desc_tbl)) {
     _schema_mapping.resize(tablet_schema->num_columns());
 }
 
@@ -307,22 +304,16 @@ Status BlockChanger::change_block(vectorized::Block* 
ref_block,
                                               _type));
             swap_idx_list.emplace_back(result_column_id, idx);
         } else if (_schema_mapping[idx].ref_column < 0) {
-            if (_type != ROLLUP) {
-                // new column, write default value
-                auto* value = _schema_mapping[idx].default_value;
-                auto column = 
new_block->get_by_position(idx).column->assume_mutable();
-                if (value->is_null()) {
-                    DCHECK(column->is_nullable());
-                    column->insert_many_defaults(row_size);
-                } else {
-                    auto type_info = 
get_type_info(_schema_mapping[idx].new_column);
-                    
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
-                                                                    
value->ptr(), column, row_size);
-                }
+            // new column, write default value
+            auto* value = _schema_mapping[idx].default_value;
+            auto column = 
new_block->get_by_position(idx).column->assume_mutable();
+            if (value->is_null()) {
+                DCHECK(column->is_nullable());
+                column->insert_many_defaults(row_size);
             } else {
-                return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                        "rollup job meet invalid ref_column, new_column={}",
-                        _schema_mapping[idx].new_column->name());
+                auto type_info = 
get_type_info(_schema_mapping[idx].new_column);
+                
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
+                                                                value->ptr(), 
column, row_size);
             }
         } else {
             // same type, just swap column
@@ -378,7 +369,7 @@ Status 
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
     }
     if (ref_column->is_nullable() != new_column->is_nullable()) {
         if (ref_column->is_nullable()) {
-            auto* ref_null_map =
+            const auto* ref_null_map =
                     
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
                             ->get_null_map_column()
                             .get_data()
@@ -392,7 +383,7 @@ Status 
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
                 return Status::DataQualityError("Null data is changed to not 
nullable");
             }
         } else {
-            auto* new_null_map =
+            const auto* new_null_map =
                     
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
                             ->get_null_map_column()
                             .get_data()
@@ -409,12 +400,12 @@ Status 
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
     }
 
     if (ref_column->is_nullable() && new_column->is_nullable()) {
-        auto* ref_null_map =
+        const auto* ref_null_map =
                 
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
                         ->get_null_map_column()
                         .get_data()
                         .data();
-        auto* new_null_map =
+        const auto* new_null_map =
                 
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
                         ->get_null_map_column()
                         .get_data()
@@ -461,12 +452,11 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr 
rowset_reader, RowsetWr
                     {rowset_reader->rowset()->rowset_id(), 0, 0},
                     {rowset_reader->rowset()->rowset_id(), UINT32_MAX, 
INT64_MAX},
                     &origin_delete_bitmap);
-            for (auto iter = origin_delete_bitmap.delete_bitmap.begin();
-                 iter != origin_delete_bitmap.delete_bitmap.end(); ++iter) {
+            for (auto& iter : origin_delete_bitmap.delete_bitmap) {
                 int ret = new_tablet->tablet_meta()->delete_bitmap().set(
-                        {rowset_writer->rowset_id(), std::get<1>(iter->first),
-                         std::get<2>(iter->first)},
-                        iter->second);
+                        {rowset_writer->rowset_id(), std::get<1>(iter.first),
+                         std::get<2>(iter.first)},
+                        iter.second);
                 DCHECK(ret == 1);
             }
         }
@@ -833,7 +823,7 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
             std::vector<RowsetMetaSharedPtr> del_preds;
             for (auto&& split : rs_splits) {
-                auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+                const auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
                 if (!rs_meta->has_delete_predicate() || 
rs_meta->start_version() > end_version) {
                     continue;
                 }
@@ -912,7 +902,7 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                     mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
                 }
                 sc_params.materialized_params_map.insert(
-                        std::make_pair(item.column_name, mv_param));
+                        std::make_pair(to_lower(item.column_name), mv_param));
             }
         }
         {
@@ -1043,9 +1033,9 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
 
     // find end version
     int32_t end_version = -1;
-    for (size_t i = 0; i < sc_params.ref_rowset_readers.size(); ++i) {
-        if (sc_params.ref_rowset_readers[i]->version().second > end_version) {
-            end_version = sc_params.ref_rowset_readers[i]->version().second;
+    for (const auto& ref_rowset_reader : sc_params.ref_rowset_readers) {
+        if (ref_rowset_reader->version().second > end_version) {
+            end_version = ref_rowset_reader->version().second;
         }
     }
 
@@ -1096,7 +1086,7 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
     auto sc_procedure = get_sc_procedure(changer, sc_sorting, sc_directly);
 
     // c.Convert historical data
-    for (auto& rs_reader : sc_params.ref_rowset_readers) {
+    for (const auto& rs_reader : sc_params.ref_rowset_readers) {
         VLOG_TRACE << "begin to convert a history rowset. version=" << 
rs_reader->version().first
                    << "-" << rs_reader->version().second;
 
@@ -1169,6 +1159,8 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
     return process_alter_exit();
 }
 
+static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__");
+
 // @static
 // Analyze the mapping of the column and the mapping of the filter key
 Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
@@ -1183,37 +1175,51 @@ Status SchemaChangeHandler::_parse_request(const 
SchemaChangeParams& sc_params,
     const std::unordered_map<std::string, AlterMaterializedViewParam>& 
materialized_function_map =
             sc_params.materialized_params_map;
     DescriptorTbl desc_tbl = *sc_params.desc_tbl;
+
+    if (sc_params.alter_tablet_type == ROLLUP) {
+        *sc_directly = true;
+    }
+
     // set column mapping
     for (int i = 0, new_schema_size = 
new_tablet->tablet_schema()->num_columns();
          i < new_schema_size; ++i) {
         const TabletColumn& new_column = 
new_tablet->tablet_schema()->column(i);
-        const std::string& column_name = new_column.name();
+        const std::string& column_name_lower = to_lower(new_column.name());
         ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
         column_mapping->new_column = &new_column;
 
-        if (materialized_function_map.find(column_name) != 
materialized_function_map.end()) {
-            auto mvParam = materialized_function_map.find(column_name)->second;
-            column_mapping->expr = mvParam.expr;
-            int32_t column_index = 
base_tablet_schema->field_index(mvParam.origin_column_name);
-            if (column_index >= 0) {
-                column_mapping->ref_column = column_index;
+        if (materialized_function_map.find(column_name_lower) != 
materialized_function_map.end()) {
+            auto mv_param = 
materialized_function_map.find(column_name_lower)->second;
+            column_mapping->expr = mv_param.expr;
+            int32_t column_index = 
base_tablet_schema->field_index(mv_param.origin_column_name);
+            column_mapping->ref_column = column_index;
+            if (column_index >= 0 || column_mapping->expr != nullptr) {
                 continue;
-            } else if (sc_params.alter_tablet_type != ROLLUP) {
-                return Status::Error<CE_CMD_PARAMS_ERROR>(
-                        "referenced column was missing. [column={} 
,origin_column={}]", column_name,
-                        mvParam.origin_column_name);
             }
         }
 
-        int32_t column_index = base_tablet_schema->field_index(column_name);
+        int32_t column_index = 
base_tablet_schema->field_index(new_column.name());
         if (column_index >= 0) {
             column_mapping->ref_column = column_index;
             continue;
         }
 
-        if (column_name.find("__doris_shadow_") == 0) {
+        if (sc_params.alter_tablet_type == ROLLUP) {
+            std::string materialized_function_map_str;
+            for (auto str : materialized_function_map) {
+                if (!materialized_function_map_str.empty()) {
+                    materialized_function_map_str += ',';
+                }
+                materialized_function_map_str += str.first;
+            }
+            return Status::InternalError(
+                    "referenced column was missing. 
[column={},materialized_function_map={}]",
+                    new_column.name(), materialized_function_map_str);
+        }
+
+        if (new_column.name().find("__doris_shadow_") == 0) {
             // Should delete in the future, just a protection for bug.
-            LOG(INFO) << "a shadow column is encountered " << column_name;
+            LOG(INFO) << "a shadow column is encountered " << 
new_column.name();
             return Status::InternalError("failed due to operate on shadow 
column");
         }
         // Newly added column go here
@@ -1226,12 +1232,13 @@ Status SchemaChangeHandler::_parse_request(const 
SchemaChangeParams& sc_params,
                 _init_column_mapping(column_mapping, new_column, 
new_column.default_value()));
 
         LOG(INFO) << "A column with default value will be added after schema 
changing. "
-                  << "column=" << column_name << ", default_value=" << 
new_column.default_value()
-                  << " to table " << new_tablet->get_table_id();
+                  << "column=" << new_column.name()
+                  << ", default_value=" << new_column.default_value() << " to 
table "
+                  << new_tablet->get_table_id();
     }
 
-    if (materialized_function_map.count(WHERE_SIGN)) {
-        
changer->set_where_expr(materialized_function_map.find(WHERE_SIGN)->second.expr);
+    if (materialized_function_map.contains(WHERE_SIGN_LOWER)) {
+        
changer->set_where_expr(materialized_function_map.find(WHERE_SIGN_LOWER)->second.expr);
     }
 
     // Check if re-aggregation is needed.
@@ -1325,7 +1332,7 @@ Status SchemaChangeHandler::_parse_request(const 
SchemaChangeParams& sc_params,
     // use directly schema change instead.
     if (!(*sc_directly) && !(*sc_sorting)) {
         // check has remote rowset
-        for (auto& rs_reader : sc_params.ref_rowset_readers) {
+        for (const auto& rs_reader : sc_params.ref_rowset_readers) {
             if (!rs_reader->rowset()->is_local()) {
                 *sc_directly = true;
                 break;
diff --git a/regression-test/suites/mv_p0/no_await/no_await.groovy 
b/regression-test/suites/mv_p0/no_await/no_await.groovy
index 526e3acde31..ec3e7f89ca0 100644
--- a/regression-test/suites/mv_p0/no_await/no_await.groovy
+++ b/regression-test/suites/mv_p0/no_await/no_await.groovy
@@ -26,7 +26,7 @@ suite ("no_await") {
         sql "sync;"
         while (!result.contains("FINISHED")) {
             result = (sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE 
TableName='${tblName}' ORDER BY CreateTime DESC LIMIT 1;")[0]
-            if (result.contains("CANCELLED")) {
+            if 
(!result.contains("RUNNING")&&!result.contains("PENDING")&&!result.contains("FINISHED")&&!result.contains("WAITING_TXN"))
 {
                 log.info("result: ${result}")
                 assertTrue(false)
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to