xiaokang commented on code in PR #24554:
URL: https://github.com/apache/doris/pull/24554#discussion_r1336778138
##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
return Status::OK();
}
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
- TabletSchemaSPtr&
flush_schema) {
- if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+ TabletSchemaSPtr&
flush_schema) {
+ size_t num_rows = block.rows();
+ if (num_rows == 0) {
return Status::OK();
}
- // Sanitize block to match exactly from the same type of frontend meta
- vectorized::schema_util::FullBaseSchemaView schema_view;
- schema_view.table_id = _context.tablet_schema->table_id();
- vectorized::ColumnWithTypeAndName* variant_column =
- block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
- if (!variant_column) {
- return Status::OK();
+ std::vector<int> variant_column_pos;
+ if (_context.tablet_schema->is_partial_update()) {
+ // check columns that used to do partial updates should not include
variant
+ for (int i : _context.tablet_schema->get_update_cids()) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ return Status::InvalidArgument("Not implement partial updates
for variant");
+ }
+ }
+ } else {
+ for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ variant_column_pos.push_back(i);
+ }
+ }
}
- auto base_column = variant_column->column;
- vectorized::ColumnObject& object_column =
-
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
- if (object_column.empty()) {
- block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+ if (variant_column_pos.empty()) {
return Status::OK();
}
- object_column.finalize();
- // Has extended columns
-
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+ try {
+ // Parse each variant column from raw string column
+ vectorized::schema_util::parse_variant_columns(block,
variant_column_pos);
+ vectorized::schema_util::finalize_variant_columns(block,
variant_column_pos,
+ false /*not ingore
sparse*/);
+ vectorized::schema_util::encode_variant_sparse_subcolumns(block,
variant_column_pos);
+ } catch (const doris::Exception& e) {
+ // TODO more graceful, max_filter_ratio
+ LOG(WARNING) << "encounter execption " << e.to_string();
+ return Status::InternalError(e.to_string());
+ }
+
// Dynamic Block consists of two parts, dynamic part of columns and static
part of columns
- // static dynamic
- // | ----- | ------- |
+ // static extracted
+ // | --------- | ----------- |
// The static ones are original _tablet_schame columns
- flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+ flush_schema = std::make_shared<TabletSchema>();
+ flush_schema->copy_from(*_context.tablet_schema);
vectorized::Block flush_block(std::move(block));
- // The dynamic ones are auto generated and extended, append them the the
orig_block
- for (auto& entry : object_column.get_subcolumns()) {
- const std::string& column_name = entry->path.get_path();
- auto column_iter = schema_view.column_name_to_column.find(column_name);
- if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
- // Column maybe dropped by light weight schema change DDL
- continue;
- }
- TabletColumn column(column_iter->second);
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(
- column, column.is_nullable());
- // Dynamic generated columns does not appear in original tablet schema
- if (_context.tablet_schema->field_index(column.name()) < 0) {
- flush_schema->append_column(column);
- flush_block.insert({data_type->create_column(), data_type,
column.name()});
+
+ // If column already exist in original tablet schema, then we pick common
type
Review Comment:
There is no logic for existed column in tablet schema in function
append_column. Is it in the function call to get_least_common_schema bellow?
##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
return Status::OK();
}
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
- TabletSchemaSPtr&
flush_schema) {
- if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+ TabletSchemaSPtr&
flush_schema) {
+ size_t num_rows = block.rows();
+ if (num_rows == 0) {
return Status::OK();
}
- // Sanitize block to match exactly from the same type of frontend meta
- vectorized::schema_util::FullBaseSchemaView schema_view;
- schema_view.table_id = _context.tablet_schema->table_id();
- vectorized::ColumnWithTypeAndName* variant_column =
- block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
- if (!variant_column) {
- return Status::OK();
+ std::vector<int> variant_column_pos;
+ if (_context.tablet_schema->is_partial_update()) {
+ // check columns that used to do partial updates should not include
variant
+ for (int i : _context.tablet_schema->get_update_cids()) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ return Status::InvalidArgument("Not implement partial updates
for variant");
+ }
+ }
+ } else {
+ for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ variant_column_pos.push_back(i);
+ }
+ }
}
- auto base_column = variant_column->column;
- vectorized::ColumnObject& object_column =
-
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
- if (object_column.empty()) {
- block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+ if (variant_column_pos.empty()) {
return Status::OK();
}
- object_column.finalize();
- // Has extended columns
-
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+ try {
+ // Parse each variant column from raw string column
+ vectorized::schema_util::parse_variant_columns(block,
variant_column_pos);
+ vectorized::schema_util::finalize_variant_columns(block,
variant_column_pos,
+ false /*not ingore
sparse*/);
+ vectorized::schema_util::encode_variant_sparse_subcolumns(block,
variant_column_pos);
+ } catch (const doris::Exception& e) {
+ // TODO more graceful, max_filter_ratio
+ LOG(WARNING) << "encounter execption " << e.to_string();
+ return Status::InternalError(e.to_string());
+ }
+
// Dynamic Block consists of two parts, dynamic part of columns and static
part of columns
- // static dynamic
- // | ----- | ------- |
+ // static extracted
+ // | --------- | ----------- |
// The static ones are original _tablet_schame columns
- flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+ flush_schema = std::make_shared<TabletSchema>();
+ flush_schema->copy_from(*_context.tablet_schema);
vectorized::Block flush_block(std::move(block));
- // The dynamic ones are auto generated and extended, append them the the
orig_block
- for (auto& entry : object_column.get_subcolumns()) {
- const std::string& column_name = entry->path.get_path();
- auto column_iter = schema_view.column_name_to_column.find(column_name);
- if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
- // Column maybe dropped by light weight schema change DDL
- continue;
- }
- TabletColumn column(column_iter->second);
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(
- column, column.is_nullable());
- // Dynamic generated columns does not appear in original tablet schema
- if (_context.tablet_schema->field_index(column.name()) < 0) {
- flush_schema->append_column(column);
- flush_block.insert({data_type->create_column(), data_type,
column.name()});
+
+ // If column already exist in original tablet schema, then we pick common
type
+ // and cast column to common type, and modify tablet column to common type,
+ // otherwise it's a new column, we should add to frontend
+ auto append_column = [&](const TabletColumn& parent_variant, auto&
column_entry_from_object) {
+ const std::string& column_name =
+ parent_variant.name_lower_case() + "." +
column_entry_from_object->path.get_path();
+ const vectorized::DataTypePtr& final_data_type_from_object =
+ column_entry_from_object->data.get_least_common_type();
+ TabletColumn tablet_column;
+ vectorized::PathInDataBuilder full_path_builder;
+ auto full_path =
full_path_builder.append(parent_variant.name_lower_case(), false)
+
.append(column_entry_from_object->path.get_parts(), false)
+ .build();
+ vectorized::schema_util::get_column_by_type(
+ final_data_type_from_object, column_name, tablet_column,
+ vectorized::schema_util::ExtraInfo {.unique_id = -1,
+ .parent_unique_id =
parent_variant.unique_id(),
+ .path_info = full_path});
+ flush_schema->append_column(std::move(tablet_column));
+
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+ final_data_type_from_object, column_name});
+ };
+
+ // 1. Flatten variant column into flat columns, append flatten columns to
the back of original Block and TabletSchema
+ // those columns are extracted columns, leave none extracted columns
remain in original variant column, which is
+ // JSONB format at present.
+ // 2. Collect columns that need to be added or modified when data type
changes or new columns encountered
+ for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+ size_t variant_pos = variant_column_pos[i];
+ vectorized::ColumnObject& object_column =
assert_cast<vectorized::ColumnObject&>(
+
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+ const TabletColumn& parent_column =
_context.tablet_schema->columns()[variant_pos];
+ CHECK(object_column.is_finalized());
Review Comment:
Where is object_column finalized?
##########
be/src/olap/rowset/rowset_writer_context.h:
##########
@@ -40,23 +40,24 @@ struct RowsetWriterContext {
RowsetWriterContext()
: tablet_id(0),
tablet_schema_hash(0),
- index_id(0),
partition_id(0),
+ index_id(0),
rowset_type(BETA_ROWSET),
rowset_state(PREPARED),
version(Version(0, 0)),
txn_id(0),
tablet_uid(0, 0),
- segments_overlap(OVERLAP_UNKNOWN) {
+ segments_overlap(OVERLAP_UNKNOWN),
+ schema_lock(new std::mutex) {
load_id.set_hi(0);
load_id.set_lo(0);
}
RowsetId rowset_id;
int64_t tablet_id;
int64_t tablet_schema_hash;
- int64_t index_id;
int64_t partition_id;
+ int64_t index_id;
Review Comment:
why change the order?
##########
be/src/olap/rowset/rowset_writer.h:
##########
@@ -151,6 +151,8 @@ class RowsetWriter {
virtual int64_t segment_writer_ns() { return 0; }
+ virtual RowsetWriterContext& mutable_context() = 0;
Review Comment:
Why mutable? It's only called in RowsetBuilder::commit_txn() to get
tablet_schema for read and mutable is not necessary.
##########
be/src/olap/tablet.cpp:
##########
@@ -614,9 +614,9 @@ const RowsetSharedPtr Tablet::rowset_with_max_version()
const {
return iter->second;
}
-RowsetMetaSharedPtr Tablet::rowset_meta_with_max_schema_version(
+TabletSchemaSPtr Tablet::tablet_schema_with_max_schema_version(
Review Comment:
In fact, it's the merged tablet schema of all rowsets if there is variant
column in max schema version. So the name should be merged_tablet_schema or
like.
##########
be/src/vec/common/schema_util.cpp:
##########
@@ -129,266 +139,254 @@ bool is_conversion_required_between_integers(FieldType
lhs, FieldType rhs) {
return true;
}
-Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type,
ColumnPtr* result,
- RuntimeState* state) {
- ColumnsWithTypeAndName arguments;
- if (WhichDataType(type->get_type_id()).is_string()) {
- // Special handle ColumnString, since the original cast logic use
ColumnString's first item
- // as the name of the dest type
- arguments = {arg, {type->create_column_const(1, type->get_name()),
type, ""}};
- } else {
- arguments = {arg, {type->create_column_const_with_default_value(1),
type, ""}};
- }
+Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type,
ColumnPtr* result) {
+ ColumnsWithTypeAndName arguments {
+ arg, {type->create_column_const_with_default_value(1), type,
type->get_name()}};
auto function = SimpleFunctionFactory::instance().get_function("CAST",
arguments, type);
+ if (!function) {
+ return Status::InternalError("Not found cast function {} to {}",
arg.type->get_name(),
+ type->get_name());
+ }
Block tmp_block {arguments};
- // the 0 position is input argument, the 1 position is to type argument,
the 2 position is result argument
vectorized::ColumnNumbers argnum;
argnum.emplace_back(0);
argnum.emplace_back(1);
size_t result_column = tmp_block.columns();
+ auto ctx = FunctionContext::create_context(nullptr, {}, {});
+ // We convert column string to jsonb type just add a string jsonb field to
dst column instead of parse
+ // each line in original string column.
+ ctx->set_string_as_jsonb_string(true);
tmp_block.insert({nullptr, type, arg.name});
- auto need_state_only = FunctionContext::create_context(state, {}, {});
- RETURN_IF_ERROR(function->execute(need_state_only.get(), tmp_block,
argnum, result_column,
- arg.column->size()));
+ RETURN_IF_ERROR(
+ function->execute(ctx.get(), tmp_block, argnum, result_column,
arg.column->size()));
*result = std::move(tmp_block.get_by_position(result_column).column);
+ // Variant column is a really special case, src type is nullable but dst
variant type is none nullable,
+ // but we still need to wrap nullmap into variant root column to prevent
from nullable info lost.
+ // TODO rethink and better handle this sepecial situation
+ if (arg.type->is_nullable() && WhichDataType(type).is_variant_type()) {
+ auto variant = ColumnObject::create(true);
+ auto& old_variant = assert_cast<const
ColumnObject&>(*(*result)->assume_mutable());
+ DCHECK(!old_variant.get_root()->is_nullable());
+ auto nullable = ColumnNullable::create(
+ old_variant.get_root(),
+ assert_cast<const
ColumnNullable&>(*arg.column).get_null_map_column_ptr());
+ variant->create_root(make_nullable(arg.type),
nullable->assume_mutable());
+ *result = std::move(variant);
+ }
return Status::OK();
}
-static void get_column_def(const vectorized::DataTypePtr& data_type, const
std::string& name,
- TColumnDef* column) {
- if (!name.empty()) {
- column->columnDesc.__set_columnName(name);
+void get_column_by_type(const vectorized::DataTypePtr& data_type, const
std::string& name,
+ TabletColumn& column, const ExtraInfo& ext_info) {
+ column.set_name(name);
+ column.set_type(data_type->get_type_as_field_type());
+ if (ext_info.unique_id >= 0) {
+ column.set_unique_id(ext_info.unique_id);
+ }
+ if (ext_info.parent_unique_id >= 0) {
+ column.set_parent_unique_id(ext_info.parent_unique_id);
+ }
+ if (!ext_info.path_info.empty()) {
+ column.set_path_info(ext_info.path_info);
}
if (data_type->is_nullable()) {
const auto& real_type = static_cast<const
DataTypeNullable&>(*data_type);
- column->columnDesc.__set_isAllowNull(true);
- get_column_def(real_type.get_nested_type(), "", column);
+ column.set_is_nullable(true);
+ get_column_by_type(real_type.get_nested_type(), name, column, {});
return;
}
-
column->columnDesc.__set_columnType(data_type->get_type_as_tprimitive_type());
if (data_type->get_type_id() == TypeIndex::Array) {
- TColumnDef child;
- column->columnDesc.__set_children({});
- get_column_def(assert_cast<const
DataTypeArray*>(data_type.get())->get_nested_type(), "",
- &child);
- column->columnDesc.columnLength =
-
TabletColumn::get_field_length_by_type(column->columnDesc.columnType, 0);
- column->columnDesc.children.push_back(child.columnDesc);
+ TabletColumn child;
+ get_column_by_type(assert_cast<const
DataTypeArray*>(data_type.get())->get_nested_type(),
+ "", child, {});
+ column.set_length(TabletColumn::get_field_length_by_type(
+ data_type->get_type_as_tprimitive_type(), 0));
+ column.add_sub_column(child);
+ column.set_default_value("[]");
return;
}
- if (data_type->get_type_id() == TypeIndex::Tuple) {
- // TODO
- // auto tuple_type = assert_cast<const
DataTypeTuple*>(data_type.get());
- // DCHECK_EQ(tuple_type->get_elements().size(),
tuple_type->get_element_names().size());
- // for (size_t i = 0; i < tuple_type->get_elements().size(); ++i) {
- // TColumnDef child;
- // get_column_def(tuple_type->get_element(i),
tuple_type->get_element_names()[i], &child);
- // column->columnDesc.children.push_back(child.columnDesc);
- // }
- // return;
- }
- if (data_type->get_type_id() == TypeIndex::String) {
+ // size is not fixed when type is string or json
+ if (WhichDataType(*data_type).is_string() ||
WhichDataType(*data_type).is_json()) {
return;
}
if (WhichDataType(*data_type).is_simple()) {
-
column->columnDesc.__set_columnLength(data_type->get_size_of_value_in_memory());
+ column.set_length(data_type->get_size_of_value_in_memory());
return;
}
+ // TODO handle more types like struct/date/datetime/decimal...
+ __builtin_unreachable();
}
-// send an empty add columns rpc, the rpc response will fill with base schema
info
-// maybe we could seperate this rpc from add columns rpc
-Status send_fetch_full_base_schema_view_rpc(FullBaseSchemaView* schema_view) {
- TAddColumnsRequest req;
- TAddColumnsResult res;
- TTabletInfo tablet_info;
- req.__set_table_name(schema_view->table_name);
- req.__set_db_name(schema_view->db_name);
- req.__set_table_id(schema_view->table_id);
- // Set empty columns
- req.__set_addColumns({});
- auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
- Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&req, &res](FrontendServiceConnection& client) {
client->addColumns(res, req); },
- config::txn_commit_rpc_timeout_ms);
- if (!rpc_st.ok()) {
- return Status::InternalError("Failed to fetch schema info, encounter
rpc failure");
- }
- // TODO(lhy) handle more status code
- if (res.status.status_code != TStatusCode::OK) {
- LOG(WARNING) << "failed to fetch schema info, code:" <<
res.status.status_code
- << ", msg:" << res.status.error_msgs[0];
- return Status::InvalidArgument(
- fmt::format("Failed to fetch schema info, {}",
res.status.error_msgs[0]));
- }
- for (const auto& column : res.allColumns) {
- schema_view->column_name_to_column[column.column_name] = column;
+TabletColumn get_least_type_column(const TabletColumn& original, const
DataTypePtr& new_type,
+ const ExtraInfo& ext_info, bool* changed) {
+ TabletColumn result_column;
+ vectorized::DataTypePtr original_type = original.get_vec_type();
+ vectorized::DataTypePtr common_type;
+ vectorized::get_least_supertype<vectorized::LeastSupertypeOnError::Jsonb>(
+ vectorized::DataTypes {original_type, new_type}, &common_type);
+ if (!original_type->equals(*common_type)) {
+ // update to common type
+ *changed = true;
+ vectorized::schema_util::get_column_by_type(common_type,
original.name(), result_column,
+ ext_info);
+ } else {
+ *changed = false;
+ result_column = original;
+ result_column.set_parent_unique_id(ext_info.parent_unique_id);
+ result_column.set_unique_id(ext_info.unique_id);
+ result_column.set_path_info(ext_info.path_info);
}
- schema_view->schema_version = res.schema_version;
- return Status::OK();
+ return result_column;
}
-static const std::regex COLUMN_NAME_REGEX(
- "^[_a-zA-Z@0-9\\s<>/][.a-zA-Z0-9_+-/><?@#$%^&*\"\\s,:]{0,255}$");
-
-// Do batch add columns schema change
-// only the base table supported
-Status send_add_columns_rpc(ColumnsWithTypeAndName column_type_names,
- FullBaseSchemaView* schema_view) {
- if (column_type_names.empty()) {
- return Status::OK();
+void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ TabletSchemaSPtr& common_schema, int32_t
variant_col_unique_id) {
+ // Types of subcolumns by path from all tuples.
+ std::unordered_map<PathInData, DataTypes, PathInData::Hash>
subcolumns_types;
+ for (const TabletSchemaSPtr& schema : schemas) {
+ for (const TabletColumn& col : schema->columns()) {
+ // Get subcolumns of this variant
+ if (!col.path_info().empty() && col.parent_unique_id() > 0 &&
+ col.parent_unique_id() == variant_col_unique_id) {
+ subcolumns_types[col.path_info()].push_back(
+ DataTypeFactory::instance().create_data_type(col,
col.is_nullable()));
+ }
+ }
}
- TAddColumnsRequest req;
- TAddColumnsResult res;
- TTabletInfo tablet_info;
- req.__set_table_name(schema_view->table_name);
- req.__set_db_name(schema_view->db_name);
- req.__set_table_id(schema_view->table_id);
- // TODO(lhy) more configurable
- req.__set_allow_type_conflict(true);
- req.__set_addColumns({});
- // Deduplicate Column like `Level` and `level`
- // TODO we will implement new version of dynamic column soon to handle
this issue,
- // also ignore column missmatch with regex
- std::set<std::string> dedup;
- for (const auto& column_type_name : column_type_names) {
- if (dedup.contains(to_lower(column_type_name.name))) {
+ PathsInData tuple_paths;
+ DataTypes tuple_types;
+ // Get the least common type for all paths.
+ for (const auto& [key, subtypes] : subcolumns_types) {
+ assert(!subtypes.empty());
+ if (key.get_path() == ColumnObject::COLUMN_NAME_DUMMY) {
continue;
}
- if (!std::regex_match(column_type_name.name, COLUMN_NAME_REGEX)) {
+ size_t first_dim = get_number_of_dimensions(*subtypes[0]);
+ tuple_paths.emplace_back(key);
+ for (size_t i = 1; i < subtypes.size(); ++i) {
+ if (first_dim != get_number_of_dimensions(*subtypes[i])) {
+
tuple_types.emplace_back(make_nullable(std::make_shared<DataTypeJsonb>()));
+ LOG(INFO) << fmt::format(
+ "Uncompatible types of subcolumn '{}': {} and {}, cast
to JSONB",
+ key.get_path(), subtypes[0]->get_name(),
subtypes[i]->get_name());
+ break;
+ }
+ }
+ if (tuple_paths.size() == tuple_types.size()) {
continue;
}
- dedup.insert(to_lower(column_type_name.name));
- TColumnDef col;
- get_column_def(column_type_name.type, column_type_name.name, &col);
- req.addColumns.push_back(col);
- }
- auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
- Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&req, &res](FrontendServiceConnection& client) {
client->addColumns(res, req); },
- config::txn_commit_rpc_timeout_ms);
- if (!rpc_st.ok()) {
- return Status::InternalError("Failed to do schema change, rpc error");
- }
- // TODO(lhy) handle more status code
- if (res.status.status_code != TStatusCode::OK) {
- LOG(WARNING) << "failed to do schema change, code:" <<
res.status.status_code
- << ", msg:" << res.status.error_msgs[0];
- return Status::InvalidArgument(
- fmt::format("Failed to do schema change, {}",
res.status.error_msgs[0]));
- }
- size_t sz = res.allColumns.size();
- if (sz < dedup.size()) {
- return Status::InternalError(
- fmt::format("Unexpected result columns {}, expected at least
{}",
- res.allColumns.size(), column_type_names.size()));
- }
- for (const auto& column : res.allColumns) {
- schema_view->column_name_to_column[column.column_name] = column;
+ DataTypePtr common_type;
+ get_least_supertype<LeastSupertypeOnError::Jsonb>(subtypes,
&common_type);
+ if (!common_type->is_nullable()) {
+ common_type = make_nullable(common_type);
+ }
+ tuple_types.emplace_back(common_type);
}
- schema_view->schema_version = res.schema_version;
- return Status::OK();
-}
+ CHECK_EQ(tuple_paths.size(), tuple_types.size());
-Status unfold_object(size_t dynamic_col_position, Block& block, bool
cast_to_original_type,
- RuntimeState* state) {
- auto dynamic_col =
block.get_by_position(dynamic_col_position).column->assume_mutable();
- auto* column_object_ptr = assert_cast<ColumnObject*>(dynamic_col.get());
- if (column_object_ptr->empty()) {
- return Status::OK();
+ std::string variant_col_name =
common_schema->column_by_uid(variant_col_unique_id).name();
+ // Append all common type columns of this variant
+ for (int i = 0; i < tuple_paths.size(); ++i) {
+ TabletColumn common_column;
+ // const std::string& column_name = variant_col_name + "." +
tuple_paths[i].get_path();
Review Comment:
Why not use column_name with variant_col_name prefix?
##########
be/src/olap/rowset/segment_v2/encoding_info.cpp:
##########
@@ -284,6 +284,10 @@ EncodingInfoResolver::EncodingInfoResolver() {
_add_map<FieldType::OLAP_FIELD_TYPE_JSONB, PLAIN_ENCODING>();
Review Comment:
maybe we should use PLAIN_ENCODING and PLAIN_ENCODING,true only for JSONB
and VARIANT
##########
be/src/olap/tablet_meta.cpp:
##########
@@ -363,7 +363,8 @@ void TabletMeta::init_column_from_tcolumn(uint32_t
unique_id, const TColumn& tco
}
for (size_t i = 0; i < tcolumn.children_column.size(); i++) {
ColumnPB* children_column = column->add_children_columns();
- init_column_from_tcolumn(i, tcolumn.children_column[i],
children_column);
+ init_column_from_tcolumn(tcolumn.children_column[i].col_unique_id,
Review Comment:
Is it a old bug?
##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
return Status::OK();
}
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
- TabletSchemaSPtr&
flush_schema) {
- if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+ TabletSchemaSPtr&
flush_schema) {
+ size_t num_rows = block.rows();
+ if (num_rows == 0) {
return Status::OK();
}
- // Sanitize block to match exactly from the same type of frontend meta
- vectorized::schema_util::FullBaseSchemaView schema_view;
- schema_view.table_id = _context.tablet_schema->table_id();
- vectorized::ColumnWithTypeAndName* variant_column =
- block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
- if (!variant_column) {
- return Status::OK();
+ std::vector<int> variant_column_pos;
+ if (_context.tablet_schema->is_partial_update()) {
+ // check columns that used to do partial updates should not include
variant
+ for (int i : _context.tablet_schema->get_update_cids()) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ return Status::InvalidArgument("Not implement partial updates
for variant");
+ }
+ }
+ } else {
+ for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+ if (_context.tablet_schema->columns()[i].is_variant_type()) {
+ variant_column_pos.push_back(i);
+ }
+ }
}
- auto base_column = variant_column->column;
- vectorized::ColumnObject& object_column =
-
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
- if (object_column.empty()) {
- block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+ if (variant_column_pos.empty()) {
return Status::OK();
}
- object_column.finalize();
- // Has extended columns
-
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+ try {
+ // Parse each variant column from raw string column
+ vectorized::schema_util::parse_variant_columns(block,
variant_column_pos);
+ vectorized::schema_util::finalize_variant_columns(block,
variant_column_pos,
+ false /*not ingore
sparse*/);
+ vectorized::schema_util::encode_variant_sparse_subcolumns(block,
variant_column_pos);
+ } catch (const doris::Exception& e) {
+ // TODO more graceful, max_filter_ratio
+ LOG(WARNING) << "encounter execption " << e.to_string();
+ return Status::InternalError(e.to_string());
+ }
+
// Dynamic Block consists of two parts, dynamic part of columns and static
part of columns
- // static dynamic
- // | ----- | ------- |
+ // static extracted
+ // | --------- | ----------- |
// The static ones are original _tablet_schame columns
- flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+ flush_schema = std::make_shared<TabletSchema>();
+ flush_schema->copy_from(*_context.tablet_schema);
vectorized::Block flush_block(std::move(block));
- // The dynamic ones are auto generated and extended, append them the the
orig_block
- for (auto& entry : object_column.get_subcolumns()) {
- const std::string& column_name = entry->path.get_path();
- auto column_iter = schema_view.column_name_to_column.find(column_name);
- if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
- // Column maybe dropped by light weight schema change DDL
- continue;
- }
- TabletColumn column(column_iter->second);
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(
- column, column.is_nullable());
- // Dynamic generated columns does not appear in original tablet schema
- if (_context.tablet_schema->field_index(column.name()) < 0) {
- flush_schema->append_column(column);
- flush_block.insert({data_type->create_column(), data_type,
column.name()});
+
+ // If column already exist in original tablet schema, then we pick common
type
+ // and cast column to common type, and modify tablet column to common type,
+ // otherwise it's a new column, we should add to frontend
+ auto append_column = [&](const TabletColumn& parent_variant, auto&
column_entry_from_object) {
+ const std::string& column_name =
+ parent_variant.name_lower_case() + "." +
column_entry_from_object->path.get_path();
+ const vectorized::DataTypePtr& final_data_type_from_object =
+ column_entry_from_object->data.get_least_common_type();
+ TabletColumn tablet_column;
+ vectorized::PathInDataBuilder full_path_builder;
+ auto full_path =
full_path_builder.append(parent_variant.name_lower_case(), false)
+
.append(column_entry_from_object->path.get_parts(), false)
+ .build();
+ vectorized::schema_util::get_column_by_type(
+ final_data_type_from_object, column_name, tablet_column,
+ vectorized::schema_util::ExtraInfo {.unique_id = -1,
+ .parent_unique_id =
parent_variant.unique_id(),
+ .path_info = full_path});
+ flush_schema->append_column(std::move(tablet_column));
+
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+ final_data_type_from_object, column_name});
+ };
+
+ // 1. Flatten variant column into flat columns, append flatten columns to
the back of original Block and TabletSchema
+ // those columns are extracted columns, leave none extracted columns
remain in original variant column, which is
+ // JSONB format at present.
+ // 2. Collect columns that need to be added or modified when data type
changes or new columns encountered
+ for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+ size_t variant_pos = variant_column_pos[i];
+ vectorized::ColumnObject& object_column =
assert_cast<vectorized::ColumnObject&>(
+
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+ const TabletColumn& parent_column =
_context.tablet_schema->columns()[variant_pos];
+ CHECK(object_column.is_finalized());
+ std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
+ for (auto& entry : object_column.get_subcolumns()) {
+ if (entry->path.empty()) {
+ // root
+ root = entry;
+ continue;
+ }
+ append_column(parent_column, entry);
}
+ // Create new variant column and set root column
+ auto obj = vectorized::ColumnObject::create(true, false);
+ // '{}' indicates a root path
+ static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
+ {}, root->data.get_finalized_column_ptr()->assume_mutable(),
+ root->data.get_least_common_type());
+ flush_block.get_by_position(variant_pos).column = obj->get_ptr();
+ vectorized::PathInDataBuilder full_root_path_builder;
+ auto full_root_path =
+ full_root_path_builder.append(parent_column.name_lower_case(),
false).build();
+
flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path);
+ VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
}
- // Ensure column are all present at this schema version.Otherwise there
will be some senario:
- // Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added
columns and schema version became 10
- // Load2 -> version(10) with schema [a, b, c] and has no extended columns
and fetched the schema at version 10
- // Load2 will persist meta with [a, b, c] but Load1 will persist meta
with [a, b, c, d, e]
- // So we should make sure that rowset at the same schema version alawys
contain the same size of columns.
- // so that all columns at schema_version is in either
_context.tablet_schema or schema_change_recorder
- for (const auto& [name, column] : schema_view.column_name_to_column) {
- if (_context.tablet_schema->field_index(name) == -1) {
- const auto& tcolumn = schema_view.column_name_to_column[name];
- TabletColumn new_column(tcolumn);
- _context.schema_change_recorder->add_extended_columns(column,
-
schema_view.schema_version);
- }
+ {
+ // Update rowset schema, tablet's tablet schema will be updated when
build Rowset
+ // Eg. flush schema: A(int), B(float), C(int), D(int)
+ // ctx.tablet_schema: A(bigint), B(double)
+ // => update_schema: A(bigint), B(double), C(int), D(int)
+ std::lock_guard<std::mutex> lock(*(_context.schema_lock));
Review Comment:
Is lock necessary? Is the _context is shared between different rowsets or
different segments of the same rowset?
##########
be/src/olap/tablet.cpp:
##########
@@ -626,6 +626,18 @@ RowsetMetaSharedPtr
Tablet::rowset_meta_with_max_schema_version(
:
a->tablet_schema()->schema_version() <
b->tablet_schema()->schema_version());
});
+ TabletSchemaSPtr target_schema = max_schema_version_rs->tablet_schema();
+ if (target_schema->num_variant_columns() > 0) {
+ // For variant columns tablet schema need to be the merged wide tablet
schema
+ std::vector<TabletSchemaSPtr> schemas;
+ std::transform(rowset_metas.begin(), rowset_metas.end(),
std::back_inserter(schemas),
+ [](const RowsetMetaSharedPtr& rs_meta) { return
rs_meta->tablet_schema(); });
+ target_schema = std::make_shared<TabletSchema>();
+ // TODO(lhy) maybe slow?
+ vectorized::schema_util::get_least_common_schema(schemas,
target_schema);
Review Comment:
Why merge all rowset meta if there is variant column in max version schema?
Why not just use max version schema?
##########
be/src/olap/rowset/beta_rowset_writer_v2.h:
##########
@@ -104,6 +100,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {
return nullptr;
}
+ RowsetWriterContext& mutable_context() override { LOG(FATAL) << "not
implemented"; }
Review Comment:
What's the difference between BetaRowsetWriterV2 and BetaRowsetWriter? And
why not impl in BetaRowsetWriterV2?
##########
be/src/vec/olap/olap_data_convertor.cpp:
##########
@@ -587,28 +594,17 @@ const void*
OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::get_data_at(
return null_flag ? nullptr : _slice.data() + offset;
}
-Status
OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::convert_to_olap() {
- assert(_typed_column.column);
- const vectorized::ColumnString* column_string = nullptr;
- if (_nullmap) {
- auto nullable_column =
- assert_cast<const
vectorized::ColumnNullable*>(_typed_column.column.get());
- column_string = assert_cast<const vectorized::ColumnString*>(
- nullable_column->get_nested_column_ptr().get());
- } else {
- column_string = assert_cast<const
vectorized::ColumnString*>(_typed_column.column.get());
- }
-
+Status OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::convert_to_olap(
Review Comment:
Why change OlapColumnDataConvertorVarChar?
##########
be/src/olap/rowset_builder.cpp:
##########
@@ -266,6 +267,20 @@ Status RowsetBuilder::commit_txn() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
+
+ if (_tablet->tablet_schema()->num_variant_columns() > 0) {
Review Comment:
Is this for memtable move forward?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]