This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f573918aa43 [Chore](materialized-view) output reference column info
when create mv can't find ref column (#27182)
f573918aa43 is described below
commit f573918aa43ad6ffcf2729aecce6ccf26c40a829
Author: Pxl <[email protected]>
AuthorDate: Thu Nov 30 16:48:06 2023 +0800
[Chore](materialized-view) output reference column info when create mv
can't find ref column (#27182)
output reference column info when create mv can't find ref column
---
be/src/olap/column_mapping.h | 4 +-
be/src/olap/schema_change.cpp | 125 +++++++++++----------
.../suites/mv_p0/no_await/no_await.groovy | 2 +-
3 files changed, 69 insertions(+), 62 deletions(-)
diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h
index ba6e0a6857a..41564bda3cc 100644
--- a/be/src/olap/column_mapping.h
+++ b/be/src/olap/column_mapping.h
@@ -27,12 +27,12 @@ namespace doris {
class WrapperField;
struct ColumnMapping {
- ColumnMapping() : ref_column(-1), default_value(nullptr) {}
+ ColumnMapping() = default;
virtual ~ColumnMapping() = default;
// <0: use default value
// >=0: use origin column
- int32_t ref_column;
+ int32_t ref_column = -1;
// normally for default value. stores values for filters
WrapperField* default_value = nullptr;
std::shared_ptr<TExpr> expr;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index e29e1f277dc..40cf4f9f88d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -17,16 +17,13 @@
#include "olap/schema_change.h"
-#include <gen_cpp/AgentService_types.h>
-#include <gen_cpp/Exprs_types.h>
-#include <gen_cpp/olap_file.pb.h>
-
#include <algorithm>
#include <exception>
#include <map>
#include <mutex>
#include <roaring/roaring.hh>
#include <tuple>
+#include <utility>
#include "common/logging.h"
#include "common/signal_handler.h"
@@ -82,7 +79,7 @@ class CollectionValue;
using namespace ErrorCode;
-constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+constexpr int ALTER_TABLE_BATCH_SIZE = 4064;
class MultiBlockMerger {
public:
@@ -91,7 +88,7 @@ public:
Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
RowsetWriter* rowset_writer, uint64_t* merged_rows) {
int rows = 0;
- for (auto& block : blocks) {
+ for (const auto& block : blocks) {
rows += block->rows();
}
if (!rows) {
@@ -100,7 +97,7 @@ public:
std::vector<RowRef> row_refs;
row_refs.reserve(rows);
- for (auto& block : blocks) {
+ for (const auto& block : blocks) {
for (uint16_t i = 0; i < block->rows(); i++) {
row_refs.emplace_back(block.get(), i);
}
@@ -127,7 +124,7 @@ public:
vectorized::AGG_LOAD_SUFFIX);
agg_functions.push_back(function);
// create aggregate data
- vectorized::AggregateDataPtr place = new
char[function->size_of_data()];
+ auto* place = new char[function->size_of_data()];
function->create(place);
agg_places.push_back(place);
} catch (...) {
@@ -150,7 +147,7 @@ public:
auto row_ref = row_refs[i];
for (int j = key_number; j < columns; j++) {
- auto column_ptr = row_ref.get_column(j).get();
+ const auto* column_ptr = row_ref.get_column(j).get();
agg_functions[j - key_number]->add(
agg_places[j - key_number],
const_cast<const
vectorized::IColumn**>(&column_ptr), row_ref.position,
@@ -243,7 +240,7 @@ private:
};
BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl
desc_tbl)
- : _desc_tbl(desc_tbl) {
+ : _desc_tbl(std::move(desc_tbl)) {
_schema_mapping.resize(tablet_schema->num_columns());
}
@@ -307,22 +304,16 @@ Status BlockChanger::change_block(vectorized::Block*
ref_block,
_type));
swap_idx_list.emplace_back(result_column_id, idx);
} else if (_schema_mapping[idx].ref_column < 0) {
- if (_type != ROLLUP) {
- // new column, write default value
- auto* value = _schema_mapping[idx].default_value;
- auto column =
new_block->get_by_position(idx).column->assume_mutable();
- if (value->is_null()) {
- DCHECK(column->is_nullable());
- column->insert_many_defaults(row_size);
- } else {
- auto type_info =
get_type_info(_schema_mapping[idx].new_column);
-
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
-
value->ptr(), column, row_size);
- }
+ // new column, write default value
+ auto* value = _schema_mapping[idx].default_value;
+ auto column =
new_block->get_by_position(idx).column->assume_mutable();
+ if (value->is_null()) {
+ DCHECK(column->is_nullable());
+ column->insert_many_defaults(row_size);
} else {
- return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "rollup job meet invalid ref_column, new_column={}",
- _schema_mapping[idx].new_column->name());
+ auto type_info =
get_type_info(_schema_mapping[idx].new_column);
+
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
+ value->ptr(),
column, row_size);
}
} else {
// same type, just swap column
@@ -378,7 +369,7 @@ Status
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
}
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
- auto* ref_null_map =
+ const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
->get_null_map_column()
.get_data()
@@ -392,7 +383,7 @@ Status
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
return Status::DataQualityError("Null data is changed to not
nullable");
}
} else {
- auto* new_null_map =
+ const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_null_map_column()
.get_data()
@@ -409,12 +400,12 @@ Status
BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
}
if (ref_column->is_nullable() && new_column->is_nullable()) {
- auto* ref_null_map =
+ const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
->get_null_map_column()
.get_data()
.data();
- auto* new_null_map =
+ const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_null_map_column()
.get_data()
@@ -461,12 +452,11 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr
rowset_reader, RowsetWr
{rowset_reader->rowset()->rowset_id(), 0, 0},
{rowset_reader->rowset()->rowset_id(), UINT32_MAX,
INT64_MAX},
&origin_delete_bitmap);
- for (auto iter = origin_delete_bitmap.delete_bitmap.begin();
- iter != origin_delete_bitmap.delete_bitmap.end(); ++iter) {
+ for (auto& iter : origin_delete_bitmap.delete_bitmap) {
int ret = new_tablet->tablet_meta()->delete_bitmap().set(
- {rowset_writer->rowset_id(), std::get<1>(iter->first),
- std::get<2>(iter->first)},
- iter->second);
+ {rowset_writer->rowset_id(), std::get<1>(iter.first),
+ std::get<2>(iter.first)},
+ iter.second);
DCHECK(ret == 1);
}
}
@@ -833,7 +823,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
std::vector<RowsetMetaSharedPtr> del_preds;
for (auto&& split : rs_splits) {
- auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+ const auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
if (!rs_meta->has_delete_predicate() ||
rs_meta->start_version() > end_version) {
continue;
}
@@ -912,7 +902,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
- std::make_pair(item.column_name, mv_param));
+ std::make_pair(to_lower(item.column_name), mv_param));
}
}
{
@@ -1043,9 +1033,9 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// find end version
int32_t end_version = -1;
- for (size_t i = 0; i < sc_params.ref_rowset_readers.size(); ++i) {
- if (sc_params.ref_rowset_readers[i]->version().second > end_version) {
- end_version = sc_params.ref_rowset_readers[i]->version().second;
+ for (const auto& ref_rowset_reader : sc_params.ref_rowset_readers) {
+ if (ref_rowset_reader->version().second > end_version) {
+ end_version = ref_rowset_reader->version().second;
}
}
@@ -1096,7 +1086,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
auto sc_procedure = get_sc_procedure(changer, sc_sorting, sc_directly);
// c.Convert historical data
- for (auto& rs_reader : sc_params.ref_rowset_readers) {
+ for (const auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "begin to convert a history rowset. version=" <<
rs_reader->version().first
<< "-" << rs_reader->version().second;
@@ -1169,6 +1159,8 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
return process_alter_exit();
}
+static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__");
+
// @static
// Analyze the mapping of the column and the mapping of the filter key
Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
@@ -1183,37 +1175,51 @@ Status SchemaChangeHandler::_parse_request(const
SchemaChangeParams& sc_params,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map =
sc_params.materialized_params_map;
DescriptorTbl desc_tbl = *sc_params.desc_tbl;
+
+ if (sc_params.alter_tablet_type == ROLLUP) {
+ *sc_directly = true;
+ }
+
// set column mapping
for (int i = 0, new_schema_size =
new_tablet->tablet_schema()->num_columns();
i < new_schema_size; ++i) {
const TabletColumn& new_column =
new_tablet->tablet_schema()->column(i);
- const std::string& column_name = new_column.name();
+ const std::string& column_name_lower = to_lower(new_column.name());
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;
- if (materialized_function_map.find(column_name) !=
materialized_function_map.end()) {
- auto mvParam = materialized_function_map.find(column_name)->second;
- column_mapping->expr = mvParam.expr;
- int32_t column_index =
base_tablet_schema->field_index(mvParam.origin_column_name);
- if (column_index >= 0) {
- column_mapping->ref_column = column_index;
+ if (materialized_function_map.find(column_name_lower) !=
materialized_function_map.end()) {
+ auto mv_param =
materialized_function_map.find(column_name_lower)->second;
+ column_mapping->expr = mv_param.expr;
+ int32_t column_index =
base_tablet_schema->field_index(mv_param.origin_column_name);
+ column_mapping->ref_column = column_index;
+ if (column_index >= 0 || column_mapping->expr != nullptr) {
continue;
- } else if (sc_params.alter_tablet_type != ROLLUP) {
- return Status::Error<CE_CMD_PARAMS_ERROR>(
- "referenced column was missing. [column={}
,origin_column={}]", column_name,
- mvParam.origin_column_name);
}
}
- int32_t column_index = base_tablet_schema->field_index(column_name);
+ int32_t column_index =
base_tablet_schema->field_index(new_column.name());
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
}
- if (column_name.find("__doris_shadow_") == 0) {
+ if (sc_params.alter_tablet_type == ROLLUP) {
+ std::string materialized_function_map_str;
+ for (auto str : materialized_function_map) {
+ if (!materialized_function_map_str.empty()) {
+ materialized_function_map_str += ',';
+ }
+ materialized_function_map_str += str.first;
+ }
+ return Status::InternalError(
+ "referenced column was missing.
[column={},materialized_function_map={}]",
+ new_column.name(), materialized_function_map_str);
+ }
+
+ if (new_column.name().find("__doris_shadow_") == 0) {
// Should delete in the future, just a protection for bug.
- LOG(INFO) << "a shadow column is encountered " << column_name;
+ LOG(INFO) << "a shadow column is encountered " <<
new_column.name();
return Status::InternalError("failed due to operate on shadow
column");
}
// Newly added column go here
@@ -1226,12 +1232,13 @@ Status SchemaChangeHandler::_parse_request(const
SchemaChangeParams& sc_params,
_init_column_mapping(column_mapping, new_column,
new_column.default_value()));
LOG(INFO) << "A column with default value will be added after schema
changing. "
- << "column=" << column_name << ", default_value=" <<
new_column.default_value()
- << " to table " << new_tablet->get_table_id();
+ << "column=" << new_column.name()
+ << ", default_value=" << new_column.default_value() << " to
table "
+ << new_tablet->get_table_id();
}
- if (materialized_function_map.count(WHERE_SIGN)) {
-
changer->set_where_expr(materialized_function_map.find(WHERE_SIGN)->second.expr);
+ if (materialized_function_map.contains(WHERE_SIGN_LOWER)) {
+
changer->set_where_expr(materialized_function_map.find(WHERE_SIGN_LOWER)->second.expr);
}
// Check if re-aggregation is needed.
@@ -1325,7 +1332,7 @@ Status SchemaChangeHandler::_parse_request(const
SchemaChangeParams& sc_params,
// use directly schema change instead.
if (!(*sc_directly) && !(*sc_sorting)) {
// check has remote rowset
- for (auto& rs_reader : sc_params.ref_rowset_readers) {
+ for (const auto& rs_reader : sc_params.ref_rowset_readers) {
if (!rs_reader->rowset()->is_local()) {
*sc_directly = true;
break;
diff --git a/regression-test/suites/mv_p0/no_await/no_await.groovy
b/regression-test/suites/mv_p0/no_await/no_await.groovy
index 526e3acde31..ec3e7f89ca0 100644
--- a/regression-test/suites/mv_p0/no_await/no_await.groovy
+++ b/regression-test/suites/mv_p0/no_await/no_await.groovy
@@ -26,7 +26,7 @@ suite ("no_await") {
sql "sync;"
while (!result.contains("FINISHED")) {
result = (sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE
TableName='${tblName}' ORDER BY CreateTime DESC LIMIT 1;")[0]
- if (result.contains("CANCELLED")) {
+ if
(!result.contains("RUNNING")&&!result.contains("PENDING")&&!result.contains("FINISHED")&&!result.contains("WAITING_TXN"))
{
log.info("result: ${result}")
assertTrue(false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]